In [43]:
import numpy as np
import pandas as pd 
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import DQN

In [44]:
class maze_env(gym.Env):

    def __init__(self, maze_grid):
        
        super().__init__()

        maze_grid[maze_grid == "Buddy"] = 4
        #maze_grid[maze_grid == "T"] = 2
        maze_grid[maze_grid == "Rover"] = 3
        maze_grid = maze_grid.astype(int)

        self.begin_maze_grid = maze_grid
        self.maze= np.copy(self.begin_maze_grid)
        self.action_space = spaces.Discrete(4)
        self.shape = self.maze.shape
        self.observation_space = spaces.Box(low=0, high=4, shape=self.shape, dtype=int)

        self.buddy_pos = np.argwhere(self.maze==4)[0]
        self.rover_pos = np.argwhere(self.maze==3)[0]
        self.start_pos = np.copy(self.buddy_pos)
        self.old_pos = None
        #self.prev_pos = []
        self.goal_reached_count = 0


    def step(self, action):
        
        self.moves += 1
        #print(f"action: {action}")
        new_pos = np.copy(self.buddy_pos)
        #mage_pos = np.argwhere(self.maze == "2")
        terminated = False
        truncated = False
        
        #self.prev_pos.append(tuple(self.buddy_pos))

        if action == 0:  # Left
            new_pos[1] -= 1
        elif action == 1:  # Right
            new_pos[1] += 1
        elif action == 2:  # Up
            new_pos[0] -= 1
        elif action == 3:  # Down
            new_pos[0] += 1

        #print(f"out of bounds : {self.is_out_of_bounds(new_pos, self.maze)}")

        if (self.is_out_of_bounds(new_pos, self.maze)):
            reward = -0.2

        elif (np.any(np.all(np.argwhere(self.maze==0) == new_pos, axis=1))):
            reward = -0.2

        else:

            if (np.any(np.all(np.argwhere(self.maze==1) == new_pos, axis=1))):
                self.old_pos = np.copy(self.buddy_pos)
                self.maze[tuple(self.buddy_pos)], self.maze[tuple(new_pos)] = self.maze[tuple(new_pos)], self.maze[tuple(self.buddy_pos)]
                self.buddy_pos = np.copy(new_pos)
                reward = - 0.02 
            
            else:
                reward = 1
                self.maze[tuple(self.buddy_pos)] = 0
                terminated = True

        observation = np.copy(self.maze)

        #print("Observation content (step):\n", observation)

        if (terminated):
            self.goal_reached_count = self.goal_reached_count + 1

        #print(f"No. of times goal reached = {self.goal_reached_count}")

        return  observation, reward, terminated, truncated, {}

    def is_out_of_bounds(self, coord, array):

        row, col = coord
        max_row, max_col = array.shape

        return row < 0 or row >= max_row or col < 0 or col >= max_col

    def reset(self, seed=None, options=None):
        
        self.maze = np.copy(self.begin_maze_grid)
        self.buddy_pos = np.copy(self.start_pos)
        observation = np.copy(self.maze)
        self.moves = 0

        #print("Observation content (reset):\n", observation)

        return observation, None

    def render(self,mode="human",close=False):

        if close:
            return
        if mode == 'human':
            for i in range(self.shape[0]):
                for j in range(self.shape[1]):
                    if self.maze[i, j] == 4:
                        print(" B ", end=" ")  # Agent
                    elif self.maze[i, j] == 3:
                        print(" G ", end=" ")  # Goal
                    elif self.maze[i, j] == 0:
                        print(" # ", end=" ")  # Wall
                    elif self.maze[i, j] == 2:
                        print(" M ", end=" ")  # Magician
                    else:
                        print(" . ", end=" ")  # Empty space
                print()

In [45]:
maze_array = np.array([
    ["Buddy", 0, 1, 1, 1, 1, 1, 1, 1, 1],
    [1, 1, 1, 1, 1, 0, 1, 1, 1, 1],
    [1, 1, 1, 1, 1, 0, 1, 1, 1, 1],
    [0, 0, 1, 0, 0, 1, 0, 1, 1, 1],
    [1, 1, 0, 1, 0, 1, 0, 0, 0, 1],
    [1, 1, 0, 1, 0, 1, 1, 1, 1, 1],
    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
    [1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
    [1, 0, 0, 0, 0, 0, 1, 1, 1, 1],
    [1, 1, 1, 1, 1, 1, 1, 0, 1, "Rover"]
],dtype=object)               

env = maze_env(maze_array)

env.render()


 B   #   .   .   .   .   .   .   .   .  
 .   .   .   .   .   #   .   .   .   .  
 .   .   .   .   .   #   .   .   .   .  
 #   #   .   #   #   .   #   .   .   .  
 .   .   #   .   #   .   #   #   #   .  
 .   .   #   .   #   .   .   .   .   .  
 .   .   .   .   .   .   .   .   .   .  
 .   .   .   .   .   .   #   #   #   #  
 .   #   #   #   #   #   .   .   .   .  
 .   .   .   .   .   .   .   #   .   G  


In [46]:
env.observation_space.sample()

array([[2, 2, 3, 1, 0, 0, 1, 0, 2, 2],
       [3, 1, 2, 4, 3, 0, 2, 1, 4, 4],
       [1, 3, 4, 4, 0, 3, 3, 0, 2, 1],
       [1, 2, 1, 3, 3, 4, 3, 3, 2, 1],
       [0, 1, 2, 4, 1, 0, 4, 4, 1, 4],
       [4, 0, 0, 4, 0, 4, 2, 1, 2, 1],
       [1, 1, 3, 1, 1, 4, 0, 3, 1, 2],
       [2, 0, 0, 3, 3, 3, 3, 1, 3, 1],
       [4, 0, 3, 0, 2, 2, 2, 4, 1, 3],
       [2, 0, 4, 1, 4, 0, 0, 0, 3, 3]])

In [47]:
model = DQN("MlpPolicy", env, verbose=1, exploration_fraction=0.01)
model = model.learn(total_timesteps=int(1000000))  

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 5.26e+04 |
|    ep_rew_mean      | -1.3e+03 |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 601      |
|    time_elapsed     | 349      |
|    total_timesteps  | 210316   |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 2.77e-05 |
|    n_updates        | 52553    |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 3.38e+04 |
|    ep_rew_mean      | -820     |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 651      |
|    time_elapsed     | 415      |
|    total_timesteps  | 270793   |
| train/              |        

KeyboardInterrupt: 

In [48]:
obs, info = env.reset()
done = False
i = 1
while not done:
    print(f"Step: {i}")
    
    action, _ = model.predict(obs)
    
    print("Predicted action:", action)
    
    obs, reward, done, truncated, info = env.step(action)
    print("Reward:", reward)
    
    env.render()
    
    i += 1

    if (i>50):
        print("Could not finish maze")
        break

Step: 1
Predicted action: 3
Reward: -0.02
 .   #   .   .   .   .   .   .   .   .  
 B   .   .   .   .   #   .   .   .   .  
 .   .   .   .   .   #   .   .   .   .  
 #   #   .   #   #   .   #   .   .   .  
 .   .   #   .   #   .   #   #   #   .  
 .   .   #   .   #   .   .   .   .   .  
 .   .   .   .   .   .   .   .   .   .  
 .   .   .   .   .   .   #   #   #   #  
 .   #   #   #   #   #   .   .   .   .  
 .   .   .   .   .   .   .   #   .   G  
Step: 2
Predicted action: 1
Reward: -0.02
 .   #   .   .   .   .   .   .   .   .  
 .   B   .   .   .   #   .   .   .   .  
 .   .   .   .   .   #   .   .   .   .  
 #   #   .   #   #   .   #   .   .   .  
 .   .   #   .   #   .   #   #   #   .  
 .   .   #   .   #   .   .   .   .   .  
 .   .   .   .   .   .   .   .   .   .  
 .   .   .   .   .   .   #   #   #   #  
 .   #   #   #   #   #   .   .   .   .  
 .   .   .   .   .   .   .   #   .   G  
Step: 3
Predicted action: 1
Reward: -0.02
 .   #   .   .   .   .   .   .   .   .  
 .   .   B   