<a href="https://colab.research.google.com/github/unverciftci/RL_LLM/blob/main/RL_LLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# LLM Reinforcement Learning Agent Demo
# Run this in Google Colab - it uses Qwen3-0.6B to learn maze solving through trial and error

# !pip install transformers torch accelerate

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output, display
from collections import deque
import json
import time

# Initialize the model (Qwen3-0.6B - small enough for Colab)
print("Loading Qwen3-0.6B model...")
model_name = "Qwen/Qwen3-0.6B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

# Simple Maze Environment
class MazeEnv:
    def __init__(self, size=5):
        self.size = size
        self.reset()

    def reset(self):
        self.player_pos = [0, 0]
        self.goal_pos = [self.size-1, self.size-1]

        # Create walls (1 = wall, 0 = empty)
        self.maze = np.zeros((self.size, self.size))
        # Add some walls to make it interesting
        self.maze[1, 1:4] = 1
        self.maze[2, 2] = 1
        self.maze[3, 0:3] = 1

        self.steps = 0
        return self.get_state_description()

    def get_state_description(self):
        x, y = self.player_pos

        # Check what's in each direction
        directions = {
            'North': [x-1, y] if x > 0 else None,
            'South': [x+1, y] if x < self.size-1 else None,
            'East': [x, y+1] if y < self.size-1 else None,
            'West': [x, y-1] if y > 0 else None
        }

        walls = []
        for dir_name, pos in directions.items():
            if pos is None or self.maze[pos[0], pos[1]] == 1:
                walls.append(dir_name)

        # Calculate relative position to goal
        goal_dir = []
        if self.goal_pos[0] < x: goal_dir.append("North")
        if self.goal_pos[0] > x: goal_dir.append("South")
        if self.goal_pos[1] < y: goal_dir.append("West")
        if self.goal_pos[1] > y: goal_dir.append("East")

        state = f"Position: ({x},{y}). "
        if walls:
            state += f"Walls at: {', '.join(walls)}. "
        state += f"Goal is "
        if goal_dir:
            state += f"{' and '.join(goal_dir)}"
        else:
            state += "HERE!"

        return state

    def step(self, action):
        old_pos = self.player_pos.copy()

        # Move based on action
        if action == 'North' and self.player_pos[0] > 0:
            self.player_pos[0] -= 1
        elif action == 'South' and self.player_pos[0] < self.size-1:
            self.player_pos[0] += 1
        elif action == 'East' and self.player_pos[1] < self.size-1:
            self.player_pos[1] += 1
        elif action == 'West' and self.player_pos[1] > 0:
            self.player_pos[1] -= 1

        # Check if hit wall
        if self.maze[self.player_pos[0], self.player_pos[1]] == 1:
            self.player_pos = old_pos  # Bounce back
            reward = -0.5  # Penalty for hitting wall
        elif self.player_pos == self.goal_pos:
            reward = 10  # Big reward for reaching goal
        else:
            reward = -0.1  # Small penalty for each step

        self.steps += 1
        done = (self.player_pos == self.goal_pos) or (self.steps > 50)

        return self.get_state_description(), reward, done

    def render(self):
        maze_display = np.zeros((self.size, self.size))
        maze_display[self.maze == 1] = 0.5  # Walls
        maze_display[self.player_pos[0], self.player_pos[1]] = 1  # Player
        maze_display[self.goal_pos[0], self.goal_pos[1]] = 0.8  # Goal

        plt.imshow(maze_display, cmap='coolwarm')
        plt.title(f"Step: {self.steps}")
        plt.axis('off')
        plt.show()

# LLM Agent with Memory
class LLMAgent:
    def __init__(self, model, tokenizer, temperature=0.7):
        self.model = model
        self.tokenizer = tokenizer
        self.memory = deque(maxlen=20)  # Remember last 20 experiences
        self.temperature = temperature
        self.episode_count = 0

    def build_prompt(self, state):
        # Build a prompt that includes recent experiences
        prompt = "You are learning to navigate a maze through trial and error.\n\n"

        if len(self.memory) > 0:
            prompt += "Your recent experiences:\n"
            for exp in list(self.memory)[-10:]:  # Use last 10 experiences
                prompt += f"- At {exp['state'][:20]}..., took action {exp['action']} → "
                if exp['reward'] > 5:
                    prompt += f"REACHED GOAL! (reward: {exp['reward']:.1f})\n"
                elif exp['reward'] < -0.3:
                    prompt += f"hit wall (reward: {exp['reward']:.1f})\n"
                else:
                    prompt += f"moved forward (reward: {exp['reward']:.1f})\n"
            prompt += "\n"

        prompt += f"Current state: {state}\n"
        prompt += "Based on your experience, choose the best action.\n"
        prompt += "Available actions: North, South, East, West\n"
        prompt += "Your action:"

        return prompt

    def get_action(self, state, epsilon=0.1):
        # Epsilon-greedy exploration
        if np.random.random() < epsilon:
            return np.random.choice(['North', 'South', 'East', 'West'])

        prompt = self.build_prompt(state)

        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=10,
                temperature=self.temperature,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )

        response = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)

        # Parse action from response
        action = None
        for act in ['North', 'South', 'East', 'West']:
            if act.lower() in response.lower():
                action = act
                break

        if action is None:
            action = np.random.choice(['North', 'South', 'East', 'West'])

        return action

    def update_memory(self, state, action, reward, next_state):
        self.memory.append({
            'state': state,
            'action': action,
            'reward': reward,
            'next_state': next_state
        })

# Training Loop
def train_agent(episodes=20):
    env = MazeEnv(size=5)
    agent = LLMAgent(model, tokenizer)

    episode_rewards = []
    episode_steps = []
    success_rate = deque(maxlen=10)

    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        done = False

        # Decay epsilon over time
        epsilon = max(0.1, 0.5 - episode * 0.02)

        print(f"\n=== Episode {episode + 1} ===")

        while not done:
            # Get action from LLM
            action = agent.get_action(state, epsilon)

            # Take action in environment
            next_state, reward, done = env.step(action)

            # Update agent's memory
            agent.update_memory(state, action, reward, next_state)

            total_reward += reward
            state = next_state

            # Print step info
            if reward > 5:
                print(f"🎯 GOAL REACHED! Action: {action}")
            elif reward < -0.3:
                print(f"💥 Hit wall with action: {action}")
            else:
                print(f"→ Moved {action}, new position in state")

        episode_rewards.append(total_reward)
        episode_steps.append(env.steps)
        success_rate.append(1 if env.player_pos == env.goal_pos else 0)

        print(f"Episode reward: {total_reward:.2f}, Steps: {env.steps}")
        print(f"Success rate (last 10): {np.mean(success_rate)*100:.1f}%")

        # Visualize the final position
        if episode % 5 == 0 or episode == episodes - 1:
            env.render()

    # Plot learning curves
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    ax1.plot(episode_rewards)
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')
    ax1.set_title('Learning Progress: Rewards')
    ax1.grid(True)

    ax2.plot(episode_steps)
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Steps to Complete')
    ax2.set_title('Learning Progress: Efficiency')
    ax2.grid(True)

    plt.tight_layout()
    plt.show()

    return agent

# Run the training
print("\n🚀 Starting LLM RL Training...\n")
print("The agent will learn to navigate a maze using only trial and error!")
print("Watch as it builds memory of what works and what doesn't.\n")

trained_agent = train_agent(episodes=20)

print("\n✅ Training Complete!")
print("\nThe LLM has learned to solve the maze through reinforcement learning,")
print("using its memory of past experiences to make better decisions.")

# Test the trained agent
print("\n🧪 Testing trained agent (without exploration)...")
env = MazeEnv(size=5)
state = env.reset()
done = False
test_reward = 0

print("\nFinal test run:")
while not done and env.steps < 30:
    action = trained_agent.get_action(state, epsilon=0)  # No exploration
    state, reward, done = env.step(action)
    test_reward += reward
    print(f"Step {env.steps}: {action} → Reward: {reward:.2f}")

env.render()
print(f"\nTest episode reward: {test_reward:.2f}")
if env.player_pos == env.goal_pos:
    print("🎉 Successfully reached the goal!")
else:
    print("⏱️ Timed out")