# 🧠 Bipedal Walker Strategy Fine-Tuning with Llama-3.2-3B

This notebook sets up a LoRA-fine-tunable Llama model using `unsloth` to generate
and evaluate **reinforcement learning strategies** for the BipedalWalker environment.

It includes:
- Model + tokenizer loading  
- Gym environment integration  
- Reward evaluation functions  
- GRPO training setup  


## 1. Setup and Model Loading

In [1]:
BASE_ID = "unsloth/Llama-3.2-3B-Instruct"

from unsloth import FastLanguageModel
import torch

# Model hyperparameters
max_seq_length = 2048
lora_rank = 128

# Load base model and tokenizer
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=BASE_ID,
    max_seq_length=max_seq_length,
    load_in_4bit=False,
    load_in_8bit=False,
    full_finetuning=False,
)

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.


  from .autonotebook import tqdm as notebook_tqdm


🦥 Unsloth Zoo will now patch everything to make training faster!
Unsloth: AMD currently is not stable with 4bit bitsandbytes. Disabling for now.
==((====))==  Unsloth 2025.10.9: Fast Llama patching. Transformers: 4.56.2.
   \\   /|    AMD Radeon Graphics. Num GPUs = 1. Max memory: 191.688 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.8.0+rocm6.4. ROCm Toolkit: 6.4.43482-0f2d60242. Triton: 3.4.0
\        /    Bfloat16 = TRUE. FA [Xformers = 0.0.32.post2. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


Loading checkpoint shards: 100% 2/2 [00:02<00:00,  1.22s/it]


## 2. Apply LoRA Adaptation

In [2]:
model = FastLanguageModel.get_peft_model(
    model,
    r=lora_rank,
    target_modules=[
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ],
    lora_alpha=lora_rank,
    use_gradient_checkpointing="unsloth",
    random_state=3407,
)

Unsloth 2025.10.9 patched 28 layers with 28 QKV layers, 28 O layers and 28 MLP layers.


## 3. Gym Environment Setup

In [3]:
from envs.gym_environment import GymEnvironment, GymAction

base_url = "http://localhost:9000"
request_timeout_s = 1000

env = GymEnvironment(base_url=base_url, request_timeout_s=request_timeout_s)
state = env.reset()
print("Environment reset:", state)

# Take one step to verify environment
state = env.step(GymAction(action=[0.1] * 4))
print("Step result:", state)

Environment reset: StepResult(observation=GymObservation(done=False, reward=None, metadata={}, state=[0.002747555961832404, -2.382378625043202e-06, 0.0001853278954513371, -0.01599998027086258, 0.09211806952953339, -0.0002445672871544957, 0.8601776361465454, 0.0015840515261515975, 1.0, 0.032523151487112045, -0.0002445560530759394, 0.8537275195121765, 0.0001730085350573063, 1.0, 0.44081392884254456, 0.44582003355026245, 0.4614226818084717, 0.48955008387565613, 0.5341026782989502, 0.6024609208106995, 0.7091487646102905, 0.885931670665741, 1.0, 1.0], legal_actions={'low': [-1.0, -1.0, -1.0, -1.0], 'high': [1.0, 1.0, 1.0, 1.0]}, episode_length=0, total_reward=0.0, frame=None), reward=None, done=False)
Step result: StepResult(observation=GymObservation(done=False, reward=-0.1063923373310528, metadata={}, state=[-0.008335273712873459, -0.006122593302279711, -0.00252687931060791, 0.0007477727485820651, 0.43431541323661804, 0.010125350207090378, 0.12396615743637085, 0.04638111591339111, 1.0, 0.

## 4. Strategy Execution Helpers

In [4]:
from typing import Callable
from unsloth import execute_with_time_limit

def _execute_strategy(strategy: Callable, env: GymEnvironment, return_frames=False):
    """Execute a given strategy function within the gym environment."""
    if not callable(strategy):
        raise ValueError("Unvalid function")
    actions, observations, frames = [], [], []
    obs = env.reset().observation.state
    total, steps, done = 0.0, 0, False

    while not done:
        a = strategy(obs)
        observations.append(obs)
        actions.append(a)
        step_result = env.step(GymAction(action=a, return_frame=return_frames))
        total += step_result.observation.reward
        done = step_result.done
        obs = step_result.observation.state
        steps += 1
        if return_frames:
            frames.append(step_result.observation.frame)
    print(f"Ran strategy for {steps} steps -> Scored {total}")
    return steps, total, frames

@execute_with_time_limit(5)
def execute_strategy(strategy: Callable, env, return_frames=False):
    """Run a strategy with timeout protection."""
    return _execute_strategy(strategy, env, return_frames)

## 5. Utilities

In [5]:
import re
from typing import List

def extract_python_code(md_text: str) -> list[str]:
    """Extract all Python code blocks from markdown text."""
    pattern = r"```(?:python)?\s*([\s\S]*?)\s*```"
    return [block.strip() for block in re.findall(pattern, md_text, re.IGNORECASE) if block.strip()]

## 6. User Prompt for LLM

In [6]:
USER_PROMPT = """
Create a new short **Bipedal Walker agent strategy** using only native Python code.
You are given a 24-element list representing the current observation from the environment:
`[hull_angle, hull_angular_velocity, horizontal_speed, vertical_speed, hip1_angle, hip1_speed, hip2_angle, hip2_speed, leg1_contact, hip3_angle, hip3_speed, hip4_angle, hip4_speed, leg2_contact, lidar_1, lidar_2, lidar_3, lidar_4, lidar_5, lidar_6, lidar_7, lidar_8, lidar_9, lidar_10]`

Output one action as a list of 4 continuous values in the range `[-1.0, 1.0]`, corresponding to the motor speeds of the walker’s four joints:
`[hip1, knee1, hip2, knee2]`.

The goal is to move forward smoothly through uneven terrain without falling, keeping balance, maintaining momentum, and minimizing unnecessary torque.

Follow these rules for better performance:

1. **Balance First:** Keep the hull angle near zero and angular velocity small to stay upright.
2. **Step Rhythmically:** Alternate leg movements to create a walking rhythm.
3. **Gentle Force:** Avoid sudden, large torques — use small, smooth joint movements.
4. **Forward Motion:** Encourage slight forward horizontal speed while avoiding high vertical speed.
5. **Terrain Awareness:** Use lidar readings to detect upcoming slopes and adjust the step accordingly.
6. **Recovery Logic:** If the hull tilts too much or a leg loses contact, adjust opposite leg torque to stabilize.

Output your new short function in backticks using the format below:

```python
def strategy(obs) -> list[float]:
    return [0, 0, 0, 0]  # Example
```

All helper logic should be inside `def strategy`.
Only output the short function `strategy`.
"""

print(USER_PROMPT)


Create a new short **Bipedal Walker agent strategy** using only native Python code.
You are given a 24-element list representing the current observation from the environment:
`[hull_angle, hull_angular_velocity, horizontal_speed, vertical_speed, hip1_angle, hip1_speed, hip2_angle, hip2_speed, leg1_contact, hip3_angle, hip3_speed, hip4_angle, hip4_speed, leg2_contact, lidar_1, lidar_2, lidar_3, lidar_4, lidar_5, lidar_6, lidar_7, lidar_8, lidar_9, lidar_10]`

Output one action as a list of 4 continuous values in the range `[-1.0, 1.0]`, corresponding to the motor speeds of the walker’s four joints:
`[hip1, knee1, hip2, knee2]`.

The goal is to move forward smoothly through uneven terrain without falling, keeping balance, maintaining momentum, and minimizing unnecessary torque.

Follow these rules for better performance:

1. **Balance First:** Keep the hull angle near zero and angular velocity small to stay upright.
2. **Step Rhythmically:** Alternate leg movements to create a walking r


## 7. Dataset Preparation

In [7]:

from datasets import Dataset

dataset = Dataset.from_list([{"prompt": [{"role": "user", "content": USER_PROMPT}]}] * 1000)
maximum_length = len(tokenizer.apply_chat_template(dataset[0]["prompt"], add_generation_prompt=True))
print("Max prompt length:", maximum_length)

Max prompt length: 447



## 8. Strategy evaluation function

In [8]:

def generate_strategy(model, tokenizer, prompt, max_new_tokens=512):
    """Generate strategy function using the trained model."""
    text = tokenizer.apply_chat_template(prompt, tokenize=False, add_generation_prompt=True)
    formatted = tokenizer.apply_chat_template(prompt, return_tensors="pt").to(model.device)
    with torch.no_grad():
        out = model.generate(
            **tokenizer(text, return_tensors="pt").to("cuda"),
            max_new_tokens=max_new_tokens,
            temperature=1.0,
            do_sample=True,
        )

    new_tokens = out[0][formatted.shape[-1]:]
    return tokenizer.decode(new_tokens, skip_special_tokens=True)



## 9. Evaluation and Reward Functions

In [9]:

import numpy as np
from unsloth import check_python_modules, create_locked_down_function


def score_function(steps, reward):
    """Compute a simple score from steps and reward."""
    return max(0, (120 + reward) * 2 + (1 / steps) * 10)

def _run_responses(responses):
    """Safely evaluate generated strategies in the environment."""
    scores = []
    for response in responses:
        function_list = extract_python_code(response)
        if not function_list:
            scores.append(-5)
            continue

        function = function_list[-1]
        ok, info = check_python_modules(function)
        if "error" in info:
            scores.append(-5)
            continue

        try:
            new_strategy = create_locked_down_function(function)
            steps, reward, _ = execute_strategy(new_strategy, env)
            scores.append(score_function(steps, reward))
        except TimeoutError:
            scores.append(-2.0)
        except Exception as e:
            print(f"Failed to execute strategy: {e}")
            scores.append(-3.0)
    return scores
def strategy_succeeds(completions, **kwargs):
    responses = [completion[0]["content"] for completion in completions]
    return _run_responses(responses)

PRINTER = 0
def function_works(completions, **kwargs):
    global PRINTER
    scores = []
    # Generate a random game board with seed
    responses = [completion[0]["content"] for completion in completions]

    for resp in responses:
        if PRINTER % 5 == 0:
            printed = True
            print("SNIPPET: "+resp[:100]+"\n--------------\n")
        PRINTER += 1

        score = 0
        function = extract_python_code(resp)
        if function:
            ok, info = check_python_modules(function[-1])
            print(function[-1]+"\n--------------\n")
        if not function or "error" in info:
            score = -2.0
        else:
            try:
                new_strategy = create_locked_down_function(function[-1])
                score = 2.0
            except Exception as e:
                score = -0.5
        scores.append(score)
    return scores

## 10. Test Base model


In [10]:
generated_function = generate_strategy(model, tokenizer, [{"role": "user", "content": USER_PROMPT}],1024) 
print(generated_function)

assistant

```python
def strategy(obs):
    hull_angle = obs[0]
    angular_velocity = obs[1]
    horizontal_speed = obs[2]
    vertical_speed = obs[3]
    hip1_angle = obs[4]
    hip1_speed = obs[5]
    hip2_angle = obs[6]
    hip2_speed = obs[7]
    leg1_contact = obs[8]
    hip3_angle = obs[9]
    hip3_speed = obs[10]
    hip4_angle = obs[11]
    hip4_speed = obs[12]
    leg2_contact = obs[13]
    lidar_1 = obs[14]
    lidar_2 = obs[15]
    lidar_3 = obs[16]
    lidar_4 = obs[17]
    lidar_5 = obs[18]
    lidar_6 = obs[19]
    lidar_7 = obs[20]
    lidar_8 = obs[21]
    lidar_9 = obs[22]
    lidar_10 = obs[23]

    # Balance First
    if abs(hull_angle) > 0.5:
        hip1_speed = max(-abs(hip1_speed), 0)
        hip2_speed = max(-abs(hip2_speed), 0)
        hip3_speed = max(-abs(hip3_speed), 0)
        hip4_speed = max(-abs(hip4_speed), 0)

    # Step Rhythmically
    if leg1_contact:
        hip2_speed = -max(hip2_speed, 0)
    if not leg1_contact:
        hip1_speed = -max(hip1_s

In [11]:
input_to_reward = [[{"content": generated_function}]]
print(f"Strategy reward:  {strategy_succeeds(input_to_reward)}")
print(f"Function correctness reward:  {function_works(input_to_reward)}")

Failed to execute strategy: 500 Server Error: Internal Server Error for url: http://localhost:9000/step
Strategy reward:  [-3.0]
SNIPPET: assistant

```python
def strategy(obs):
    hull_angle = obs[0]
    angular_velocity = obs[1]
    ho
--------------

def strategy(obs):
    hull_angle = obs[0]
    angular_velocity = obs[1]
    horizontal_speed = obs[2]
    vertical_speed = obs[3]
    hip1_angle = obs[4]
    hip1_speed = obs[5]
    hip2_angle = obs[6]
    hip2_speed = obs[7]
    leg1_contact = obs[8]
    hip3_angle = obs[9]
    hip3_speed = obs[10]
    hip4_angle = obs[11]
    hip4_speed = obs[12]
    leg2_contact = obs[13]
    lidar_1 = obs[14]
    lidar_2 = obs[15]
    lidar_3 = obs[16]
    lidar_4 = obs[17]
    lidar_5 = obs[18]
    lidar_6 = obs[19]
    lidar_7 = obs[20]
    lidar_8 = obs[21]
    lidar_9 = obs[22]
    lidar_10 = obs[23]

    # Balance First
    if abs(hull_angle) > 0.5:
        hip1_speed = max(-abs(hip1_speed), 0)
        hip2_speed = max(-abs(hip2_speed), 0)
  


## 11. GRPO Trainer Configuration

In [12]:

from trl import GRPOConfig, GRPOTrainer

max_prompt_length = maximum_length + 1
max_completion_length = max_seq_length - max_prompt_length

training_args = GRPOConfig(
temperature=1.0,
min_p=0.1,
learning_rate=5e-6,
weight_decay=0.01,
warmup_ratio=0.1,
lr_scheduler_type="linear",
optim="adamw_8bit",
logging_steps=1,
per_device_train_batch_size=4,
gradient_accumulation_steps=1,
num_generations=4,
max_prompt_length=max_prompt_length,
max_completion_length=max_completion_length,
max_steps=400,
save_steps=50,
report_to="none",
output_dir="outputs_llama_bipedal",
)


## 12. Initialize Trainer and Train

In [15]:

trainer = GRPOTrainer(
model=model,
processing_class=tokenizer,
reward_funcs=[
strategy_succeeds,
function_works,
],
args=training_args,
train_dataset=dataset,
)

trainer.train()


## 13. Save the Merged Model

In [None]:

model.save_pretrained_merged("llama_bipedal_final", tokenizer, save_method="merged_16bit")

## 14. Evaluate finetuned model

In [18]:
model_finetuned, _ = FastLanguageModel.from_pretrained(
    model_name="./llama_bipedal_2",
    max_seq_length=max_seq_length,
    load_in_4bit=False,
    load_in_8bit=False,
    full_finetuning=False,
)

Unsloth: AMD currently is not stable with 4bit bitsandbytes. Disabling for now.
==((====))==  Unsloth 2025.10.9: Fast Llama patching. Transformers: 4.56.2.
   \\   /|    AMD Radeon Graphics. Num GPUs = 1. Max memory: 191.688 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.8.0+rocm6.4. ROCm Toolkit: 6.4.43482-0f2d60242. Triton: 3.4.0
\        /    Bfloat16 = TRUE. FA [Xformers = 0.0.32.post2. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


Loading checkpoint shards: 100% 2/2 [00:02<00:00,  1.20s/it]


In [19]:
# Enable faster inference
model_finetuned = FastLanguageModel.for_inference(model_finetuned)

In [22]:
generated_function = generate_strategy(model_finetuned, tokenizer, [{"role": "user", "content": USER_PROMPT}],1024) 
print(generated_function)

>
<|im_start|>assistant
```python
def strategy(obs) -> list[float]:
    hull_angle = obs[0]
    hull_angular_velocity = obs[1]
    horizontal_speed = obs[2]
    vertical_speed = abs(obs[3])  # take absolute value
    hip1_angle, hip1_speed = obs[4:6]
    hip2_angle, hip2_speed = obs[6:8]
    leg1_contact, hip3_angle, hip3_speed = obs[8:11]
    hip4_angle, hip4_speed = obs[11:13]
    leg2_contact = obs[13]
    lidar_1, lidar_2, lidar_3, lidar_4, lidar_5, lidar_6, lidar_7, lidar_8, lidar_9, lidar_10 = obs[14:25]

    # Balance First
    if abs(hull_angle) > 0.1:
        hip1_speed = 0
        hip2_speed = 0
    else:
        hip1_speed = 0.1
        hip2_speed = -0.1

    # Step Rhythmically
    if leg1_contact:
        hip3_speed = -0.1
        hip4_speed = 0.1
    else:
        hip3_speed = 0
        hip4_speed = 0

    # Gentle Force
    hip1_speed = max(-0.5, min(0.5, hip1_speed))
    hip2_speed = max(-0.5, min(0.5, hip2_speed))
    hip3_speed = max(-0.5, min(0.5, hip3_speed))
    hi

In [23]:
input_to_reward = [[{"content": generated_function}]]
print(f"Strategy reward:  {strategy_succeeds(input_to_reward)}")
print(f"Function correctness reward:  {function_works(input_to_reward)}")

Ran strategy for 83 steps -> Scored -115.99754700483554
Strategy reward:  [8.125387918039765]
def strategy(obs) -> list[float]:
    hull_angle = obs[0]
    hull_angular_velocity = obs[1]
    horizontal_speed = obs[2]
    vertical_speed = abs(obs[3])  # take absolute value
    hip1_angle, hip1_speed = obs[4:6]
    hip2_angle, hip2_speed = obs[6:8]
    leg1_contact, hip3_angle, hip3_speed = obs[8:11]
    hip4_angle, hip4_speed = obs[11:13]
    leg2_contact = obs[13]
    lidar_1, lidar_2, lidar_3, lidar_4, lidar_5, lidar_6, lidar_7, lidar_8, lidar_9, lidar_10 = obs[14:25]

    # Balance First
    if abs(hull_angle) > 0.1:
        hip1_speed = 0
        hip2_speed = 0
    else:
        hip1_speed = 0.1
        hip2_speed = -0.1

    # Step Rhythmically
    if leg1_contact:
        hip3_speed = -0.1
        hip4_speed = 0.1
    else:
        hip3_speed = 0
        hip4_speed = 0

    # Gentle Force
    hip1_speed = max(-0.5, min(0.5, hip1_speed))
    hip2_speed = max(-0.5, min(0.5, hip2_spe