In [99]:
from __future__ import annotations

from minigrid.core.constants import COLOR_NAMES
from minigrid.core.grid import Grid
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Door, Goal, Key, Wall
from minigrid.manual_control import ManualControl
from minigrid.minigrid_env import MiniGridEnv
from minigrid.wrappers import Wrapper
import numpy as np


In [100]:
class SimpleEnv(MiniGridEnv):
    def __init__(
        self,
        size=10,
        agent_start_pos=(1, 1),
        agent_start_dir=0,
        max_steps: int | None = None,
        **kwargs,
    ):
        self.agent_start_pos = agent_start_pos
        self.agent_start_dir = agent_start_dir

        mission_space = MissionSpace(mission_func=self._gen_mission)

        if max_steps is None:
            max_steps = 4 * size**2

        super().__init__(
            mission_space=mission_space,
            grid_size=size,
            # Set this to True for maximum speed
            see_through_walls=True,
            max_steps=max_steps,
            **kwargs,
        )

    @staticmethod
    def _gen_mission():
        return "grand mission"

    def _gen_grid(self, width, height):
        # Create an empty grid
        self.grid = Grid(width, height)

        # Generate the surrounding walls
        self.grid.wall_rect(0, 0, width, height)

        # Generate vertical separation wall
        for i in range(0, height):
            self.grid.set(5, i, Wall())
        
        # Place the door and key
        self.grid.set(5, 6, Door(COLOR_NAMES[0], is_locked=True))
        self.grid.set(3, 6, Key(COLOR_NAMES[0]))

        # Place a goal square in the bottom-right corner
        self.put_obj(Goal(), width - 2, height - 2)

        # Place the agent
        if self.agent_start_pos is not None:
            self.agent_pos = self.agent_start_pos
            self.agent_dir = self.agent_start_dir
        else:
            self.place_agent()

        self.mission = "grand mission"

class CustomManualControl(ManualControl):
    def key_handler(self, event):
        # If the 't' key is pressed, execute the pickup action.
        if event.key == 't':
            print('here')
            self.step(self.env.actions.pickup)
        else:
            # For other keys, fall back to the default behavior.
            super().key_handler(event)



class ActionBonus(Wrapper):
    def __init__(self, env):
        """A wrapper that adds an exploration bonus to less visited (state,action) pairs.

        Args:
            env: The environment to apply the wrapper
        """
        super().__init__(env)
        self.counts = {}

    def step(self, action):
        """Steps through the environment with `action`."""
        obs, reward, terminated, truncated, info = self.env.step(action)

        env = self.unwrapped
        tup = (tuple(env.agent_pos), env.agent_dir, action)

        # Get the count for this (s,a) pair
        pre_count = 0
        if tup in self.counts:
            pre_count = self.counts[tup]

        # Update the count for this (s,a) pair
        new_count = pre_count + 1
        self.counts[tup] = new_count

        bonus = - np.sqrt(new_count)
        reward += bonus

        return obs, reward, terminated, truncated, info

In [101]:
import gymnasium as gym
import torch
import torch.nn as nn

from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

In [102]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 512, normalized_image: bool = False) -> None:
        super().__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 16, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(torch.as_tensor(observation_space.sample()[None], dtype=torch.float32)).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

In [94]:
import minigrid
from minigrid.wrappers import ImgObsWrapper
from stable_baselines3 import PPO
import numpy as np
import argparse
from datetime import datetime
from pdb import set_trace
from time import time

import gymnasium as gym
import numpy as np
import torch as th
import torch.nn as nn
from gymnasium.spaces import Box, Dict
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor


policy_kwargs = dict(
    features_extractor_class=MinigridFeaturesExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)

env = SimpleEnv(render_mode="rgb_array") #gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ActionBonus(env)
env = ImgObsWrapper(env)

stamp = datetime.fromtimestamp(time()).strftime("%Y%m%d-%H%M%S")


checkpoint_callback = CheckpointCallback(
    save_freq=1e4,
    save_path=f"./models/ppo/miniworld_gotoobj_{stamp}/",
    name_prefix="iter",
)

model = PPO(
    "MultiInputPolicy",
    env,
    policy_kwargs=policy_kwargs,
    verbose=1,
    tensorboard_log="./logs/ppo/miniworld_gotoobj_tensorboard/",
)
model.learn(
    50000,
    tb_log_name=f"{stamp}",
    callback=checkpoint_callback,
)


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Logging to ./logs/ppo/miniworld_gotoobj_tensorboard/20250226-220237_1
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 400      |
|    ep_rew_mean     | 284      |
| time/              |          |
|    fps             | 274      |
|    iterations      | 1        |
|    time_elapsed    | 7        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 400          |
|    ep_rew_mean          | 238          |
| time/                   |              |
|    fps                  | 228          |
|    iterations           | 2            |
|    time_elapsed         | 17           |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl          

<stable_baselines3.ppo.ppo.PPO at 0x1d961b0b910>

In [107]:
import gymnasium as gym
from stable_baselines3 import PPO
from minigrid.wrappers import ActionBonus
from gymnasium.spaces import Box, Dict



env = SimpleEnv(render_mode="human")  #gym.make("MiniGrid-Empty-16x16-v0", render_mode="human")
env = ActionBonus(env)
env = ImgObsWrapper(env)

from stable_baselines3 import PPO
loaded_model = PPO(
    "MultiInputPolicy",
    env,
    policy_kwargs=policy_kwargs,
    verbose=1,
)

# add the experiment time stamp
loaded_model = loaded_model.load(f"models/ppo/miniworld_gotoobj_20250226-220237/iter_500000_steps.zip", env=env)

obs, info = env.reset()
rewards = 0

for i in range(2000):
    action, _state = loaded_model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action.item())
    rewards += reward

    if i % 10 == 0:
        print(f"Step: {i}")
        print(f"Action: {action}")
        print(f"Reward: {reward}")

    if terminated or truncated:
        print(f"Test reward: {rewards}")
        obs, info = env.reset()
        rewards = 0
        continue

print(f"Test reward: {rewards}")

env.close()

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


AttributeError: 'Box' object has no attribute 'spaces'

: 