In [12]:
"""
changed part:
1.the prompt for LLM is more standardized
2.add variables' detail
3.add the website of the tasks.

The performance of cartpole, acrobot, mountaincar_continuous is much better.but it is also volatile.
"""
# Python=3.10.8
import gymnasium as gym
import time
import requests
import random
import numpy as np
import inspect
import math

# ----------------------------
# LLM HTTP call function
# ----------------------------
LLM_URL = 'https://api.yesapikey.com/v1/chat/completions'
LLM_HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer sk-PaTN85cFufotMmPm97Ae2546B0874aA29b6a86Ae069b7b4b'
}

def call_llm(prompt, model="gpt-4.1-2025-04-14", temperature=0.2, max_tokens=1024):
    data = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": temperature,
        "max_tokens": max_tokens
    }
    while True:
        try:
            response = requests.post(LLM_URL, json=data, headers=LLM_HEADERS)
            if response.status_code == 200:
                resp_json = response.json()
                if 'choices' in resp_json and resp_json['choices']:
                    content = resp_json['choices'][0].get('message', {}).get('content')
                    return content
        except Exception as e:
            print("LLM call exception:", e)
        time.sleep(2)

# ----------------------------
# Environment documentation mapping
# ----------------------------
ENV_DOC_URL = {
    "Acrobot-v1": "https://gymnasium.farama.org/environments/classic_control/acrobot/",
    "CartPole-v1": "https://gymnasium.farama.org/environments/classic_control/cart_pole/",
    "MountainCarContinuous-v0": "https://gymnasium.farama.org/environments/classic_control/mountain_car_continuous/",
    "MountainCar-v0": "https://gymnasium.farama.org/environments/classic_control/mountain_car/",
    "Pendulum-v1": "https://gymnasium.farama.org/environments/classic_control/pendulum/"
}

def get_env_doc_url(env_id: str) -> str:
    return ENV_DOC_URL.get(env_id, "https://gymnasium.farama.org/")

# ----------------------------
# Static Knowledge
# ----------------------------
STATIC_KNOWLEDGE = {
    "CartPole-v1": {
        "state_dim": 4,
        "state_vars": ["cart_position", "cart_velocity", "pole_angle", "pole_velocity"],
        "state_ranges": [(-4.8, 4.8), (-float("inf"), float("inf")), (-0.418, 0.418), (-float("inf"), float("inf"))],
        "action_space": [0, 1],
        "reward_threshold": 475,
        "action_type": "discrete"
    },
    "Acrobot-v1": {
        "state_dim": 6,
        "state_vars": ["cos_theta1", "sin_theta1", "cos_theta2", "sin_theta2", "theta1_dot", "theta2_dot"],
        "state_ranges": [(-1, 1), (-1, 1), (-1, 1), (-1, 1), (-float("inf"), float("inf")), (-float("inf"), float("inf"))],
        "action_space": [0, 1, 2],
        "reward_threshold": -100,
        "action_type": "discrete"
    },
    "MountainCar-v0": {
        "state_dim": 2,
        "state_vars": ["position", "velocity"],
        "state_ranges": [(-1.2, 0.6), (-0.07, 0.07)],
        "action_space": [0, 1, 2],
        "reward_threshold": -110,
        "action_type": "discrete"
    },
    "MountainCarContinuous-v0": {
        "state_dim": 2,
        "state_vars": ["position", "velocity"],
        "state_ranges": [(-1.2, 0.6), (-0.07, 0.07)],
        "action_space": [-1.0, 1.0],
        "reward_threshold": 90,
        "action_type": "continuous"
    },
    "Pendulum-v1": {
        "state_dim": 3,
        "state_vars": ["cos_theta", "sin_theta", "theta_dot"],
        "state_ranges": [(-1, 1), (-1, 1), (-float("inf"), float("inf"))],
        "action_space": [-2.0, 2.0],
        "reward_threshold": -200,
        "action_type": "continuous"
    }
}

# ----------------------------
# Knowledge module
# ----------------------------
class Knowledge:
    def __init__(self):
        self.static_knowledge = {}
        self.dynamic_knowledge = []

    def load_static_knowledge(self, env_id):
        if env_id not in STATIC_KNOWLEDGE:
            raise ValueError("Unsupported environment")
        self.static_knowledge = STATIC_KNOWLEDGE[env_id]
        self.dynamic_knowledge = []

    def add_dynamic_entry(self, entry):
        self.dynamic_knowledge.append(entry)

    def get_dynamic_guidance(self, env_id):
        prompt = f"""
I am generating a policy in environment {env_id}.
Current dynamic knowledge entries: {self.dynamic_knowledge}

Please provide concise heuristic suggestions for policy generation based on this knowledge, such as:
- State ranges to prioritize
- Common failing action patterns
- Recommended threshold adjustments

Return a short, structured bullet list (no prose).
"""
        guidance = call_llm(prompt)
        return guidance

# ----------------------------
# Memory module
# ----------------------------
class Memory:
    def __init__(self):
        self.episodes = []

    def start_episode(self):
        self.episodes.append({"steps": [], "summary": None})

    def add_step(self, s, a, r, done):
        if not self.episodes:
            raise ValueError("Please call start_episode() before adding steps!")
        self.episodes[-1]["steps"].append({"s": s, "a": a, "r": r, "done": done})

    def add_episode_summary(self, env_id, policy_version):
        if not self.episodes:
            raise ValueError("No running episode!")
        steps = self.episodes[-1]["steps"]
        total_reward = sum(step["r"] for step in steps)
        length = len(steps)
        self.episodes[-1]["summary"] = {
            "env_id": env_id,
            "policy_version": policy_version,
            "return": total_reward,
            "length": length
        }

    def get_recent_episodes(self, n=5):
        summaries = [ep["summary"] for ep in self.episodes if ep["summary"] is not None]
        return summaries[-n:]

# ----------------------------
# Reflection module
# ----------------------------
class Reflection:
    def __init__(self, knowledge: Knowledge):
        self.knowledge = knowledge

    def metrics(self, recent_episodes):
        returns = [ep["return"] for ep in recent_episodes]
        lengths = [ep["length"] for ep in recent_episodes]
        avg_return = np.mean(returns) if returns else 0
        avg_length = np.mean(lengths) if lengths else 0
        return {"avg_return": avg_return, "avg_length": avg_length}

    def failure_pattern(self, recent_episodes, env_id):
        prompt = f"""
I have the following {env_id} environment episode summaries: {recent_episodes}
Please analyze the most common failure patterns, including state characteristics, action issues, and return patterns. Focus only on key points.
Return a concise paragraph.
"""
        pattern = call_llm(prompt).strip()
        self.knowledge.add_dynamic_entry({"failure_pattern": pattern})
        return pattern

    def edit_suggestion(self, recent_episodes, env_id):
        prompt = f"""
Based on recent episode data from environment {env_id}: {recent_episodes}
Generate one policy editing suggestion in one of the following formats:
- add_rule(condition -> action)
- modify_threshold(variable, old_value, new_value)
- reprioritize(rule_i over rule_j)

Return exactly one line with one edit.
"""
        suggestion = call_llm(prompt).strip()
        self.knowledge.add_dynamic_entry({"edit_suggestion": suggestion})
        return suggestion

# ----------------------------
# Safe policy call
# ----------------------------
def safe_policy_call(state, policy_fn, sk):
    try:
        a = policy_fn(state)
    except Exception:
        # fallback safe action
        if sk["action_type"] == "discrete":
            a = sk["action_space"][0]
        else:
            a = (sk["action_space"][0] + sk["action_space"][1]) / 2.0
    # clip if continuous
    if sk["action_type"] == "continuous":
        a = np.clip(a, sk["action_space"][0], sk["action_space"][1])
    return a

# ----------------------------
# Safe step wrapper
# ----------------------------
def safe_step(env, action):
    sk = STATIC_KNOWLEDGE[env.unwrapped.spec.id]
    if sk["action_type"] == "continuous":
        # clip and wrap in array if needed
        action = np.array([np.clip(action, sk["action_space"][0], sk["action_space"][1])]) if np.isscalar(action) else np.clip(np.array(action), sk["action_space"][0], sk["action_space"][1])
    return env.step(action)

# ----------------------------
# Policy generation helper
# ----------------------------
def _action_constraints_text(static_knowledge: dict) -> str:
    a = static_knowledge["action_space"]
    if static_knowledge.get("action_type") == "discrete":
        return f"Discrete actions; valid actions are exactly the integers in {a}."
    else:
        lo, hi = a[0], a[1]
        return f"Continuous action; return a single float within [{lo}, {hi}]. Clip if necessary."

def generate_base_policy(env_id, knowledge: Knowledge):
    guidance = knowledge.get_dynamic_guidance(env_id) or ""
    doc_url = get_env_doc_url(env_id)
    sk = knowledge.static_knowledge
    action_desc = _action_constraints_text(sk)
    state_vars_text = "\n".join([f"- {name} in range {rng}" for name, rng in zip(sk["state_vars"], sk["state_ranges"])])

    prompt = f"""
You are writing a deterministic, white-box policy for Gymnasium environment "{env_id}".
Environment documentation: {doc_url}

Observation (state vector):
{state_vars_text}

Action constraints:
- {action_desc}
- May import 'math' if needed
- Must be deterministic
- Safe defaults:
  - Discrete: first valid action in {sk['action_space']}
  - Continuous: midpoint of allowed interval

Dynamic guidance:
{guidance}

Output requirements:
- Only one Python function: def policy(state): ...
- No explanations, no markdown, no print
- Returned action strictly satisfies constraints
"""
    policy_code = call_llm(prompt)
    local_vars = {"math": math, "np": np}
    try:
        exec(policy_code, local_vars)
        policy_fn = local_vars.get("policy")
        if policy_fn is None:
            # fallback default
            if sk["action_type"] == "discrete":
                def policy_fn(state): return sk["action_space"][0]
            else:
                lo, hi = sk["action_space"]
                def policy_fn(state): return (lo + hi)/2.0
    except Exception:
        if sk["action_type"] == "discrete":
            def policy_fn(state): return sk["action_space"][0]
        else:
            lo, hi = sk["action_space"]
            def policy_fn(state): return (lo + hi)/2.0
    return policy_fn

def apply_edit(policy_fn, edit_text, knowledge: Knowledge):
    sk = knowledge.static_knowledge
    try:
        existing_src = inspect.getsource(policy_fn)
    except Exception:
        existing_src = "def policy(state):\n    return " + (str(sk["action_space"][0]) if sk["action_type"]=="discrete" else str((sk["action_space"][0]+sk["action_space"][1])/2.0))
    action_desc = _action_constraints_text(sk)

    doc_url = get_env_doc_url(knowledge.dynamic_knowledge[0].get("env_id", ""))
    prompt = f"""
Revise deterministic white-box policy for environment at {doc_url}.
Constraints: {action_desc}
Current policy:
{existing_src}
Edit suggestion: {edit_text}
You may use 'math' module.

Output only a valid Python function def policy(state): ...
"""
    policy_code = call_llm(prompt)
    local_vars = {"math": math, "np": np}
    try:
        exec(policy_code, local_vars)
        new_policy_fn = local_vars.get("policy")
        return new_policy_fn if new_policy_fn else policy_fn
    except Exception:
        return policy_fn

# ----------------------------
# Main closed-loop training
# ----------------------------
def run_env_loop(env_id, max_iters=5, episodes_per_iter=10):
    knowledge = Knowledge()
    knowledge.load_static_knowledge(env_id)
    knowledge.add_dynamic_entry({"env_id": env_id})
    memory = Memory()
    reflection = Reflection(knowledge)

    policy_version = 0
    first_iter = True
    policy_fn = None

    for iter_idx in range(max_iters):
        policy_version += 1
        if first_iter:
            policy_fn = generate_base_policy(env_id, knowledge)
            first_iter = False
        else:
            recent_episodes = memory.get_recent_episodes()
            suggestion = reflection.edit_suggestion(recent_episodes, env_id)
            policy_fn = apply_edit(policy_fn, suggestion, knowledge)

        env = gym.make(env_id)
        episode_rewards = []

        for ep in range(episodes_per_iter):
            s, _ = env.reset()
            done = False
            memory.start_episode()
            while not done:
                a = safe_policy_call(s, policy_fn, knowledge.static_knowledge)
                s_next, r, terminated, truncated, info = safe_step(env, a)
                done = terminated or truncated
                memory.add_step(s, a, r, done)
                s = s_next
            memory.add_episode_summary(env_id, policy_version)
            episode_rewards.append(memory.episodes[-1]["summary"]["return"])

        recent_episodes = memory.get_recent_episodes()
        metrics = reflection.metrics(recent_episodes)
        print(f"Iteration {iter_idx+1}, Avg Return: {metrics['avg_return']}")

        pattern = reflection.failure_pattern(recent_episodes, env_id)
        print("Failure Pattern:", pattern)

        threshold = knowledge.static_knowledge.get("reward_threshold", 0)
        if metrics["avg_return"] >= threshold:
            print(f"Converged to threshold, stopping training. Avg Return={metrics['avg_return']}")
            break

# ----------------------------
# Run multiple control tasks
# ----------------------------
if __name__ == "__main__":
    env_list = [
        "Acrobot-v1",
        "CartPole-v1",
        "MountainCarContinuous-v0",
        "MountainCar-v0",
        "Pendulum-v1"
    ]
    for env_id in env_list:
        print(f"==== Running {env_id} ====")
        run_env_loop(env_id)


==== Running Acrobot-v1 ====
Iteration 1, Avg Return: -344.0
Failure Pattern: The episode summaries for Acrobot-v1 show a recurring failure pattern where most episodes terminate at the maximum allowed length (500 steps) with the lowest possible return (-500.0), indicating the agent frequently fails to solve the task within the time limit. Only two episodes achieve early termination with better returns (-101.0 and -119.0), suggesting occasional but inconsistent success. This pattern implies that the policy often struggles to find effective action sequences, likely resulting in the agent remaining in suboptimal states (e.g., unable to swing the end-effector high enough). The predominant issue appears to be ineffective exploration or poor action selection, leading to stagnation and repeated timeouts rather than successful task completion.


KeyboardInterrupt: 