In [2]:
from __future__ import annotations

from minigrid.core.constants import COLOR_NAMES
from minigrid.core.grid import Grid
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import Door, Goal, Key, Wall
from minigrid.manual_control import ManualControl
from minigrid.minigrid_env import MiniGridEnv
from minigrid.core.actions import Actions
import numpy as np
import random

import minigrid
from minigrid.wrappers import ImgObsWrapper
from gymnasium.core import ObservationWrapper
from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.callbacks import BaseCallback
from typing import Callable, Dict, List, Optional, Tuple, Type, Union

from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.policies import ActorCriticPolicy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym

from plot import make_plot
# import matplotlib
# matplotlib.use('TkAgg') 
import matplotlib.pyplot as plt
import pdb

pygame 2.5.2 (SDL 2.28.3, Python 3.9.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


  fn()


In [3]:
class myenv(MiniGridEnv):
    def __init__(
        self,
        size=5, 
        agent_start_pos=(1, 1),
        agent_start_dir=0,
        max_steps: int | None = None,
        **kwargs,):
        
        self.grid_size = size
        self.agent_start_pos = agent_start_pos
        self.agent_start_dir = agent_start_dir
        
        self.is_sb3_mode = False

        mission_space = MissionSpace(mission_func=self._gen_mission)

        if max_steps is None:
            max_steps = 4 * size**2

        super().__init__(
            mission_space=mission_space,
            grid_size=size,
            see_through_walls=False,  # Set to False for a more realistic environment
            max_steps=max_steps,
            **kwargs,
        )
        self.door_opened = False
        self.key_found = False

    @staticmethod
    def _gen_mission():
        return "Find the key to open the door and reach the goal"

    def _gen_grid(self, width, height):
        # Create an empty grid
        self.grid = Grid(width, height)

        # Generate the surrounding walls
        self.grid.wall_rect(0, 0, width, height)

        # Randomly place the door
        door_color = COLOR_NAMES[0]
        door_pos = self._random_position(exclude=[(1, 1)])  # Exclude agent's start position
        self.grid.set(*door_pos, Door(door_color, is_locked=True))

        # Randomly place the key
        key_pos = self._random_position(exclude=[(1, 1), door_pos])  # Exclude agent's start position and door position
        self.grid.set(*key_pos, Key(door_color))

        # Place a goal square in the bottom-right corner
        self.put_obj(Goal(), width - 2, height - 2)

        # Place the agent
        if self.agent_start_pos is not None:
            self.agent_pos = self.agent_start_pos
            self.agent_dir = self.agent_start_dir
        else:
            self.place_agent()

        self.mission = "Find the key to open the door and reach the goal"
    
    def _random_position(self, exclude=[]):
        while True:
            pos = (random.randint(1, self.grid_size - 2), random.randint(1, self.grid_size - 2))
            if pos not in exclude and self.grid.get(*pos) is None:
                return pos
            
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        # Reinitialize episode-specific variables
        self.agent_pos = (-1, -1)
        self.agent_dir = -1

        # Generate a new random grid at the start of each episode
        self._gen_grid(self.width, self.height)

        # These fields should be defined by _gen_grid
        assert (
            self.agent_pos >= (0, 0)
            if isinstance(self.agent_pos, tuple)
            else all(self.agent_pos >= 0) and self.agent_dir >= 0
        )

        # Check that the agent doesn't overlap with an object
        start_cell = self.grid.get(*self.agent_pos)
        assert start_cell is None or start_cell.can_overlap()

        # Item picked up, being carried, initially nothing
        self.carrying = None
        self.key_found = False
        self.door_opened = False

        # Step count since episode start
        self.step_count = 0

        if self.render_mode == "human":
            self.render()

        # Return first observation
        obs = self.gen_obs()

        return obs, {}
            
            
    def step(self, action):
        self.step_count += 1

        reward = 0
        terminated = False
        truncated = False

        # Get the position in front of the agent
        fwd_pos = self.front_pos

        # Get the contents of the cell in front of the agent
        fwd_cell = self.grid.get(*fwd_pos)

        # Check for actions and update state
        if action == self.actions.pickup:
            if fwd_cell and fwd_cell.can_pickup() and self.carrying is None:
                self.carrying = fwd_cell
                self.grid.set(*fwd_pos, None)
                if isinstance(fwd_cell, Key):
                    self.key_found = True  # Flag for key pickup
                    reward += 0.1  # Reward for picking up the key

        elif action == self.actions.toggle:
            if fwd_cell and isinstance(fwd_cell, Door) and self.carrying and isinstance(self.carrying, Key):
                fwd_cell.toggle(self, fwd_pos)
                self.door_opened = True  # Flag for door opening
                reward += 0.1  # Reward for opening the door

        # Rotate left
        if action == self.actions.left:
            self.agent_dir -= 1
            if self.agent_dir < 0:
                self.agent_dir += 4

        # Rotate right
        elif action == self.actions.right:
            self.agent_dir = (self.agent_dir + 1) % 4

        # Move forward
        elif action == self.actions.forward:
            if fwd_cell is None or fwd_cell.can_overlap():
                self.agent_pos = tuple(fwd_pos)
            if fwd_cell is not None and fwd_cell.type == "goal":
                if self.key_found and self.door_opened:
                    terminated = True
                    reward += self._reward()  # Reward for completing all tasks
                else:
                    terminated = True
                    reward -= 0.1
            if fwd_cell is not None and fwd_cell.type == "lava":
                terminated = True
                

        # Check for max steps
        if self.step_count >= self.max_steps:
            truncated = True

        # Render if in human mode
        if self.render_mode == "human":
            self.render()

        obs = self.gen_obs()

        return obs, reward, terminated, truncated, {}



# def main():
#     env = myenv(render_mode="human")

#     # Enable manual control for testing
#     manual_control = ManualControl(env, seed=42)
#     manual_control.start()

    # obs = env.reset()
    # for _ in range(10):  # Just run a few steps for testing
    #     action = env.action_space.sample()  # Random action
    #     obs, reward, done, info = env.step(action)
    #     env.render()  # Make sure to render the environment
    #     if done:
    #         break

# if __name__ == "__main__":
#     main()

In [4]:
class SimpleEnv(MiniGridEnv):
    def __init__(
        self,
        size=7,
        agent_start_pos=(1, 1),
        agent_start_dir=0,
        max_steps: int | None = None,
        **kwargs,
    ):
        self.agent_start_pos = agent_start_pos
        self.agent_start_dir = agent_start_dir
        self.grid_size = size
        
        mission_space = MissionSpace(mission_func=self._gen_mission)

        if max_steps is None:
            max_steps = 4 * size**2

        super().__init__(
            mission_space=mission_space,
            grid_size=size,
            see_through_walls=False,  # Set to True for maximum speed
            max_steps=max_steps,
            **kwargs,
        )

    @staticmethod
    def _gen_mission():
        return "grand mission"

    def _gen_grid(self, width, height):
        self.grid = Grid(width, height)
        self.grid.wall_rect(0, 0, width, height)

        # Dynamic Wall Placement
        wall_col = width - 4  # Adjusted to leave more space on the right
        for i in range(height):
            self.grid.set(wall_col, i, Wall())

        # Randomly place the door in the same column as the wall
        door_pos = (wall_col, random.randint(1, height - 2))
        self.grid.set(*door_pos, Door(COLOR_NAMES[0], is_locked=True))

        # Define the region for the key (left side of the wall)
        key_region_end_x = wall_col - 1

        # Randomly place the key on the left side of the wall
        key_pos = self._random_position(
            exclude=[self.agent_start_pos, door_pos] + [(wall_col, i) for i in range(height)],
            max_x=key_region_end_x
        )
        self.grid.set(*key_pos, Key(COLOR_NAMES[0]))

        # Place the goal in a 3x3 region in the bottom right corner
        goal_region_start_x = width - 3
        goal_region_start_y = height - 3
        goal_pos = (random.randint(goal_region_start_x, width - 2),
                    random.randint(goal_region_start_y, height - 2))
        self.put_obj(Goal(), *goal_pos)

        # Place the agent
        if self.agent_start_pos is not None:
            self.agent_pos = self.agent_start_pos
            self.agent_dir = self.agent_start_dir
        else:
            self.place_agent()

        self.mission = "grand mission"

    def _random_position(self, exclude=[], max_x=None):
        while True:
            x_range = self.grid_size - 2 if max_x is None else max_x
            pos = (random.randint(1, x_range), random.randint(1, self.grid_size - 2))
            if pos not in exclude and self.grid.get(*pos) is None:
                return pos
            
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        # Reinitialize episode-specific variables
        self.agent_pos = (-1, -1)
        self.agent_dir = -1

        # Generate a new random grid at the start of each episode
        self._gen_grid(self.width, self.height)

        # These fields should be defined by _gen_grid
        assert (
            self.agent_pos >= (0, 0)
            if isinstance(self.agent_pos, tuple)
            else all(self.agent_pos >= 0) and self.agent_dir >= 0
        )

        # Check that the agent doesn't overlap with an object
        start_cell = self.grid.get(*self.agent_pos)
        assert start_cell is None or start_cell.can_overlap()

        # Item picked up, being carried, initially nothing
        self.carrying = None
        self.key_found = False
        self.door_opened = False

        # Step count since episode start
        self.step_count = 0

        if self.render_mode == "human":
            self.render()

        # Return first observation
        obs = self.gen_obs()

        return obs, {}
            
    def step(self, action):
        self.step_count += 1

        reward = 0
        terminated = False
        truncated = False

        # Get the position in front of the agent
        fwd_pos = self.front_pos

        # Get the contents of the cell in front of the agent
        fwd_cell = self.grid.get(*fwd_pos)

        # Check for actions and update state
        if action == self.actions.pickup:
            if fwd_cell and fwd_cell.can_pickup() and self.carrying is None:
                self.carrying = fwd_cell
                self.grid.set(*fwd_pos, None)
                if isinstance(fwd_cell, Key):
                    self.key_found = True  # Flag for key pickup
                    reward += 0.1  # Reward for picking up the key

        elif action == self.actions.toggle:
            if fwd_cell and isinstance(fwd_cell, Door) and self.carrying and isinstance(self.carrying, Key):
                # Toggle the door only if it's not already opened
                if not fwd_cell.is_open:
                    fwd_cell.toggle(self, fwd_pos)
                    if not self.door_opened:  # Check if this is the first time opening the door
                        self.door_opened = True
                        reward += 0.1 

        # Rotate left
        if action == self.actions.left:
            self.agent_dir -= 1
            if self.agent_dir < 0:
                self.agent_dir += 4

        # Rotate right
        elif action == self.actions.right:
            self.agent_dir = (self.agent_dir + 1) % 4

        # Move forward
        elif action == self.actions.forward:
            if fwd_cell is None or fwd_cell.can_overlap():
                self.agent_pos = tuple(fwd_pos)
            if fwd_cell is not None and fwd_cell.type == "goal":
                if self.key_found and self.door_opened:
                    terminated = True
                    reward += self._reward()  # Reward for completing all tasks
                else:
                    terminated = True
                    reward -= 0.1
            if fwd_cell is not None and fwd_cell.type == "lava":
                terminated = True
                

        # Check for max steps
        if self.step_count >= self.max_steps:
            truncated = True

        # Render if in human mode
        if self.render_mode == "human":
            self.render()

        obs = self.gen_obs()

        return obs, reward, terminated, truncated, {}
            


# def main():
#     env = SimpleEnv(render_mode="human")

#     # enable manual control for testing
#     manual_control = ManualControl(env, seed=42)
#     manual_control.start()

    
# if __name__ == "__main__":
#     main()

In [5]:
# env = myenv(render_mode="human")
# manual_control = ManualControl(env)
# manual_control.start()

# if __name__ == "__main__":
#     main()

In [6]:
class MinigridFeaturesExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 512):
        super().__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 16, (2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 32, (2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 64, (2, 2)),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
        )

        flattened_size = 64
        self.linear = nn.Sequential(
            nn.Linear(flattened_size, features_dim),
            nn.ReLU(),
            nn.Dropout(p=0.2) 
        )

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

In [7]:
policy_kwargs = dict(
    features_extractor_class=MinigridFeaturesExtractor,
    features_extractor_kwargs=dict(features_dim=128),
)

In [8]:
class ImgObsWrapper2(ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = env.observation_space.spaces["image"]

    def observation(self, obs):
        return obs["image"]

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        return self.observation(obs), reward, terminated, truncated, info

In [9]:
class CustomRewardCallback(BaseCallback):
    def __init__(self, check_freq, reward_threshold, verbose=1):
        super(CustomRewardCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.reward_threshold = reward_threshold
        self.total_rewards = 0
        self.episode_rewards = []
        self.mean_rewards = []
        self.fig, self.ax = plt.subplots()

    def _on_step(self) -> bool:
        self.total_rewards += self.locals['rewards'][0]

        if self.locals['dones'][0]:
            self.episode_rewards.append(self.total_rewards)
            current_mean_reward = np.mean(self.episode_rewards[-100:])  
            self.mean_rewards.append(current_mean_reward)
            self.total_rewards = 0

            make_plot(self.episode_rewards, self.mean_rewards, self.fig, self.ax)

            if current_mean_reward >= self.reward_threshold:
                print(f"Stopping training as the mean reward {current_mean_reward} is above the threshold {self.reward_threshold}")
                return False  # Return False to stop the training

        return True


# Setup model and environment
env = SimpleEnv(render_mode="human")
env = ImgObsWrapper(env)
model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=1)

# Instantiate the callback
max_reward = 0.85
callback = CustomRewardCallback(check_freq=1000, reward_threshold=max_reward)

# Train the model
model.learn(total_timesteps=int(2e5), callback=callback)
model.save("PPO_model")

# Optionally save the final plot
plt.savefig('final_training_plot.png')

# Close the environment
env.close()

In [10]:
env = SimpleEnv(size=10, render_mode = 'human')
env = ImgObsWrapper(env)

# Load the trained model, ensure to provide the correct path
model = PPO.load("PPO_model", env=env)

max_reward = 0.9
callback = CustomRewardCallback(check_freq=1000, reward_threshold=max_reward)
model.learn(total_timesteps=int(2e5), callback=callback)
model.save("PPO_model_8x8")
plt.savefig('training_plot_8x8.png')

env.close()

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 311      |
|    ep_rew_mean     | 0.383    |
| time/              |          |
|    fps             | 9        |
|    iterations      | 1        |
|    time_elapsed    | 208      |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 318        |
|    ep_rew_mean          | 0.359      |
| time/                   |            |
|    fps                  | 9          |
|    iterations           | 2          |
|    time_elapsed         | 418        |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.03623655 |
|    clip_fraction        | 0.408      |
|    clip_range           | 0.2        |
|    entr

In [11]:
env = SimpleEnv(size=12, render_mode = 'human')
env = ImgObsWrapper(env)

# Load the trained model, ensure to provide the correct path
model = PPO.load("PPO_model_8x8", env=env)

max_reward = 0.9
callback = CustomRewardCallback(check_freq=1000, reward_threshold=max_reward)
model.learn(total_timesteps=int(2e5), callback=callback)
model.save("PPO_model_10x10")
plt.savefig('training_plot_10x10.png')

env.close()

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Stopping training as the mean reward 1.0906250029802322 is above the threshold 0.9


In [12]:
env = SimpleEnv(size=16, render_mode = 'human')
env = ImgObsWrapper(env)

# Load the trained model, ensure to provide the correct path
model = PPO.load("PPO_model_10x10", env=env)

max_reward = 1.0
callback = CustomRewardCallback(check_freq=1000, reward_threshold=max_reward)
model.learn(total_timesteps=int(2e5), callback=callback)
model.save("PPO_model_16x16")
plt.savefig('training_plot_16x16.png')

env.close()

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 235      |
|    ep_rew_mean     | 0.993    |
| time/              |          |
|    fps             | 9        |
|    iterations      | 1        |
|    time_elapsed    | 207      |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 376         |
|    ep_rew_mean          | 0.849       |
| time/                   |             |
|    fps                  | 9           |
|    iterations           | 2           |
|    time_elapsed         | 416         |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.040102206 |
|    clip_fraction        | 0.324       |
|    clip_range           | 0.2       