# ACCEL IMPLEMENTATION TEST

In [7]:
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Discrete, Box
import gymnasium.spaces as spaces

from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Goal, Wall
from minigrid.minigrid_env import MiniGridEnv, Grid

from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.env_util import make_vec_env
import matplotlib.pyplot as plt


In [8]:
# ====================================================
# 1. Custom MiniGrid Environment that returns only the image
#    for SB3's PPO (which expects a Box space).
# ====================================================
class MyCustomGrid(MiniGridEnv):
    """
    Simple MiniGrid environment that places random wall tiles
    according to a config dict, returning only the 'image' observation.
    """

    def __init__(self, config=None, **kwargs):
        if config is None:
            config = {}
        self.config = config

        # Extract parameters from config
        self.width = config.get("width", 8)
        self.height = config.get("height", 8)
        self.num_blocks = config.get("num_blocks", 5)
        self.custom_seed = config.get("seed_val", None)
        self.agent_start = config.get("agent_start", None)

        # For older MiniGrid, we pass 'grid_size' not 'width'/'height' to the parent.
        # We'll pick one dimension to define 'grid_size'; but note we manually place walls to match 'width' and 'height' in _gen_grid.
        # For simplicity, let's just do: grid_size = max(width, height)
        grid_size = max(self.width, self.height)

        mission_space = MissionSpace(mission_func=lambda: "get to the green goal square")

        super().__init__(
            grid_size=grid_size,
            max_steps=self.width * self.height * 2,
            see_through_walls=False,
            agent_view_size=5,
            mission_space=mission_space,
            **kwargs
        )

        # Manually define our observation_space as a single Box (the image).
        # By default, MiniGrid's image shape is (view_size, view_size, 3) if using partial obs,
        # or (height, width, 3) if using full-grid observation. We'll do full-grid here:
        # We'll define (self.height, self.width, 3) as the shape.
        # In practice, "image" shape can vary if partial observations are used.
        self.observation_space = Box(
            low=0,
            high=255,
            shape=(self.height, self.width, 3),
            dtype=np.uint8
        )

    def _gen_grid(self, width, height):
        """
        Generate the grid layout for a new episode.
        We'll use self.width, self.height from config
        but the underlying minigrid might store its own grid_size.
        """
        # Create an empty grid of the "true" width x height from config
        self.grid = Grid(self.width, self.height)
        # Surround with walls
        self.grid.wall_rect(0, 0, self.width, self.height)

        # Place random walls inside
        for _ in range(self.num_blocks):
            r = self._rand_int(1, self.height - 1)
            c = self._rand_int(1, self.width - 1)
            self.put_obj(Wall(), c, r)

        # Place the agent in a specific position or random
        if self.agent_start is not None:
            ax, ay = self.agent_start
            self.place_agent(top=(ax, ay), size=(1, 1), rand_dir=False)
        else:
            self.place_agent()

        # Place a goal object
        self.place_obj(Goal())

    def reset(self, **kwargs):
        """
        Override reset to ensure we only return the 'image' array
        instead of a dict with 'image' and 'mission'.
        """
        obs, info = super().reset(**kwargs)
        obs = self._convert_obs(obs)
        return obs, info

    def step(self, action):
        """
        Same for step: override to convert the dict observation into an image only.
        """
        obs, reward, done, truncated, info = super().step(action)
        obs = self._convert_obs(obs)
        return obs, reward, done, truncated, info

    def _convert_obs(self, original_obs):
        """
        original_obs is typically {'image':..., 'mission':...}.
        We'll just return original_obs['image'] to get a Box(low=0,high=255) shape.
        """
        return original_obs["image"]


# ====================================================
# 2. Simple “level buffer” 
# ====================================================
# class to memorize generated levels and score
class LevelBuffer: 
    def __init__(self, max_size=50):
        self.max_size = max_size
        self.data = []  # will store (config_dict, score)

    def add(self, config, score):
        self.data.append((config, score))
        if len(self.data) > self.max_size:
            self.data.sort(key=lambda x: x[1], reverse=True)
            self.data = self.data[: self.max_size]
            #it memorize only the highest score for each level

    def sample_config(self): 
        # Samples a level from the buffer, weighting the probabilities 
        # based on the scores.
        if len(self.data) == 0:
            return None
        scores = [item[1] for item in self.data]
        total = sum(scores)
        if total <= 1e-9:
            # fallback to uniform
            idx = np.random.randint(len(self.data))
            return self.data[idx][0]
        probs = [s / total for s in scores]
        idx = np.random.choice(len(self.data), p=probs)
        return self.data[idx][0]

# ====================================================
# 3. Utility Functions
# ====================================================
def random_config():
    return {
        "width": 5, # np.random.randint(5, 10)
        "height": 5, # np.random.randint(5, 10)
        "num_blocks": np.random.randint(0, 15),
        "seed_val": np.random.randint(0, 999999),
        # "agent_start": (x, y)
    }
# Modify an existing configuration, adding randomness.
def edit_config(old_config):
    new_config = dict(old_config)
    new_config["num_blocks"] = max(0, old_config["num_blocks"] + np.random.choice([-2, -1, 1, 2]))
    new_config["seed_val"] = np.random.randint(0, 999999)
    return new_config

def calculate_regret(config, student_model,teacher_model, max_steps=200):
    """
    Calculate regret as the difference between the teacher's performance
    and the student's performance on the same level.
    """
    env = MyCustomGrid(config)
    #Teacher rollout 
    obs, _ = env.reset()
    teacher_total_reward = 0
    for _ in range(max_steps):
        action, _ = teacher_model.predict(obs, deterministic=True)
        obs, reward, done, truncated, _ = env.step(action)
        teacher_total_reward += reward
        if done or truncated:
            break

    #Student rollout
    obs, _ = env.reset()
    student_total_reward = 0
    for _ in range(max_steps):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, _ = env.step(action)
        student_total_reward += reward
        if done or truncated:
            break

    return max(0, teacher_total_reward - student_total_reward)

def initialize_ppo(env, learning_rate=1e-4):
    return PPO(
        "MlpPolicy",
        env,
        verbose=1,
        n_steps=128,
        batch_size=64,
        learning_rate=learning_rate
    )


# ====================================================
# 4. Main ACCEL Loop
# ====================================================

def main_accel_demo(total_iterations=30, replay_prob=0.7, train_steps=2000):
    #Create a dummy environment to initialize the model
    dummy_env = MyCustomGrid(config={"width": 5, "height": 5, "num_blocks": 1})
    dummy_env.reset()
    # Initialize teacher and student models with logging
    teacher_model = initialize_ppo(dummy_env)
    student_model = initialize_ppo(dummy_env)
    # creates a layer buffer.
    level_buffer = LevelBuffer(max_size=50)
    iteration_regrets = []

    # Pretrain teacher on a set of random levels
    for _ in range(10):
        cfg = random_config()
        env = MyCustomGrid(cfg)
        teacher_model.set_env(env)
        teacher_model.learn(total_timesteps=train_steps)

    # Populate buffer with initial levels
    for _ in range(10):
        cfg = random_config()
        regret = calculate_regret(cfg, student_model, teacher_model)
        level_buffer.add(cfg, regret)

    for iteration in range(total_iterations):
        print(f"\n=== ITERATION {iteration + 1}/{total_iterations} ===")
        use_replay = np.random.rand() < replay_prob
        # Generates new random levels if you don't use replay
        if not use_replay or len(level_buffer.data) == 0:
            cfg = random_config()
            regret = calculate_regret(cfg, student_model, teacher_model)
            level_buffer.add(cfg, regret)
            print(f"  Sampled new config, regret={regret:.3f}")
        else:
            # Replays an existing layer, edits it, and evaluates the new layer
            old_cfg = level_buffer.sample_config()
            env = MyCustomGrid(old_cfg)
            student_model.set_env(env)
            student_model.learn(total_timesteps=train_steps)

            new_cfg = edit_config(old_cfg)
            regret = calculate_regret(new_cfg, student_model, teacher_model)
            level_buffer.add(new_cfg, regret)
            print(f"  Replayed + mutated config, regret={regret:.3f}")
        
        iteration_regrets.append(regret)

    # Visualize progress
    plot_progress(total_iterations, iteration_regrets)
    
    print("\nDone. Final buffer size:", len(level_buffer.data))
    print("Top-5 hardest levels (config, regret):")
    level_buffer.data.sort(key=lambda x: x[1], reverse=True)
    for i, (cfg, sc) in enumerate(level_buffer.data[:5]):
        print(f"{i + 1}. regret={sc:.3f}, config={cfg}")

if __name__ == "__main__":
    main_accel_demo()


NameError: name 'configure' is not defined