--> Gymnasium is an open source Python library for developing simulated environment. <br>
--> GIT: https://github.com/Farama-Foundation/Gymnasium <br>
--> DOCUMENT:  https://gymnasium.farama.org/ <br>
--> pip3 install "gymnasium[all]" <br>

In [3]:
import gymnasium as gym
from gymnasium import spaces

import traceback
import numpy as np
import pygame

 # Existing Environment

In [4]:
env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset(seed=42)
for _ in range(100):
   action = env.action_space.sample()  # this is where you would insert your policy
   observation, reward, terminated, truncated, info = env.step(action)

   if terminated or truncated:
      observation, info = env.reset()
env.close()

# Custom Environment

## Maze - v1

''' <br>
<b>Components:</b> <br>
&nbsp;    GRID/Maze: nxn matrix <br>
&nbsp;    Agent: position is always at (0,0) ~ first cell <br>
&nbsp;    Target: position is always at (-1,-1) ~ last cell <br>
&nbsp;    Obstacles: position in multiple cells where Agent can not move <br>
<b>Actions:</b> <br>
&nbsp;    Agent can move 'up', 'down', 'left', 'right' <br>
<b>Goal:</b> <br>
&nbsp;    Agent should reach target <br>
'''<br>

In [5]:
class Agent:
    def __init__(self, x=0, y=0):
        self.name = 'MazeAgent'
        self.x = x
        self.y = y
        self.val = 6 # this is to denote that cell contains the agent
        self.action_happened = set()
        self.last_action = ''
    
    def __repr__(self):
        return f"Agent:- move: ({self.x} , {self.y}) ~ move happened: {self.action_happened} ~ last action: {self.last_action}"

In [6]:
class MazeEnv(gym.Env):
    
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    
    # method -1
    '''
    The init method intialize all the variables needed.
    '''
    def __init__(self, conf, render_mode=None):
        try:
            
            self.conf = conf
            
            # We have 4 actions, corresponding to "right", "up", "left", "down"
            # It describes the numerical structure of the legitimate actions that can be applied to the environment.
            self.action_space = spaces.Discrete(4)
            
            rows = self.conf['env']['rows']
            cols = self.conf['env']['cols']
            
            # observation is the x, y coordinate of the grid - agent's current cell position
            # for 4x4 maze, low pos: [0, 0] high pos: [3, 3]
            low = np.array([0, 0], dtype=np.int64)
            high = np.array([rows-1, cols-1], dtype=np.int64)
            self.observation_space = spaces.Box(low, high, shape=(2,), dtype=np.int64)

            
            # generate environment
            self.maze = np.zeros((rows, cols))

            # generate dummy env for tracking agent's visited cels
            self.visited = np.zeros((rows, cols))

            # generate Agent, Agent will always start from (0,0) cell
            self.agent = Agent(0, 0)
            self._updt_agent_pos(self.agent)

            # generate initial state of maze and agent
            self._gen_init_state()
            
        except Exception as e:
            raise e
    
    # method -2
    '''
    The reset method will be called to initiate a new episode. 
    '''
    def reset(self, seed=None):
        try:
            # We need the following line to seed self.np_random
            super().reset(seed=seed)
            self._gen_init_state()
            observation = self._get_obs()
            info = self._get_info()
            
            return observation, info
        except Exception as e:
            raise e
     
    # method -3
    '''
    The step method takes an action as an input and applies it to the environment, 
    which leads to the environment transitioning to a new state.
    action: 'up', 'down', 'right', 'left'
    '''
    def step(self, action):
        try:
            # get the direction where agent should move
            dir_num, dir_pos = self._action_to_direction(action)

            # Whether the episode has been terminated
            terminated = False
            # The reward that you can get from the environment after executing the action 
            # that was given as the input to the step function.
            reward = 0

            # agent's new position
            new_x = self.agent.x + dir_pos[0]
            new_y = self.agent.y + dir_pos[1]

            if not self._chk_pos_validity(new_x, new_y):
                reward = -1 
                self.agent.action_happened.add(action)
                if len(self.agent.action_happened) == 4: # agent can not move any more
                    terminated = True   
            else:
                self._updt_agent_pos(Agent(new_x, new_y))
                self.agent.action_happened = set()
                
                terminated = True if self._win() else False
                reward = 1 if terminated else 0.001

            
            self.agent.last_action = dir_num
            
            # The observation of the state of the environment.
            observation = self._get_obs()
            
            # This provides additional information depending on the environment.
            info = self._get_info()
            
            return observation, reward, terminated, False, info
        except Exception as e:
            raise e
            
    
    # method -4
    '''
    The render method is for rendering the environment
    '''
    def render(self):
        try:
            self._visualize()
        except Exception as e:
            raise e
    
    # method -5
    '''
    The close method should close any open resources that were used by the environment.
    '''
    def close(self):
        pass
    
    # translates the environment’s state into an observation
    def _get_obs(self):
        try:
            return np.array([self.agent.x, self.agent.y], dtype=np.int64)
        except Exception as e:
            raise e
    
    # auxiliary information
    def _get_info(self):
        try:
            return {"visited": self.visited}
        except Exception as e:
            raise e
    
    # updating agent position in maze
    def _updt_agent_pos(self, agent):
        try:
            self.agent.x = agent.x
            self.agent.y = agent.y

            self.visited[self.agent.x, self.agent.y] = 1
        except Exception as e:
            raise e
    
    # checking agent position is valid or not in maze
    def _chk_pos_validity(self,  x, y):
        try:
            rows = self.maze.shape[0]
            cols = self.maze.shape[1]

            # agent can not visit out of bound, obstacles, already visited cells
            if x<0 or y<0 or x>=rows or y>=cols or self.maze[x][y] == -1 or self.visited[x][y] == 1:
                return False
            return True
        except Exception as e:
            raise e
    
    # condition for wining the game
    def _win(self):
        try:
            if self.maze[self.agent.x, self.agent.y] == 1: # agent reached last cell
                return True     
            return False
        except Exception as e:
            raise e
    
    
    # It is for initializing maze and agent position
    def _gen_init_state(self):
        try:
            # target will be always at the last cell
            self.maze[-1, -1] = 1

            # placing obstacles in maze
            self.maze[0, 1:3] = -1
            self.maze[1, 2:] = -1
            self.maze[2, 0] = -1
            self.maze[3, 0:2] = -1

            # replacing agent at (0, 0) cell
            self._updt_agent_pos(Agent(0, 0))
        except Exception as e:
            raise e
        
    # It is for visualizing the maze's current position
    def _visualize(self):
        try:
            maze = self.maze.copy()
            maze[self.agent.x, self.agent.y] = self.agent.val
            print(self.agent)
            print(maze)
        except Exception as e:
            raise e
            
    # converting action to direction
    def _action_to_direction(self, action_num):
        try:
            ACTION = ["down", "right", "up", "left"]
            
            
            action = {
                'down': np.array([1, 0]),
                'right': np.array([0, 1]),
                'up': np.array([-1, 0]),
                'left': np.array([0, -1]),
            }
            
            return ACTION[action_num], action[ACTION[action_num]]
            
        except Exception as e:
            raise e

In [7]:
def get_conf():
    try:
        conf = {
            'env':{
                    'rows': 4,
                    'cols': 4
                }
        }       
        return conf
    except Exception as e:
        raise e

In [8]:
def test_env(env):
    try:
        
        # checking using manual check
        test_episodes = 2

        for episode in range(1, test_episodes+1):

            obs, info = env.reset()
            terminated = False
            score = 0

            while not terminated:
                env.render()
                action = env.action_space.sample()
                obs, reward, terminated, truncated, info = env.step(action)
                score += reward

            env.render()
            print(f"after episode:- {episode}, score:- {score}")

        env.close()
    except Exception as e:
        raise e

In [9]:
def main():
    try:
        # get config
        conf = get_conf()
        
        # Create the environment ~ MAZE
        m = MazeEnv(conf)
        
        # checking whether environment is working or not
        test_env(m) 
        
        print("DONE")
        
    except Exception as e:
        traceback.print_exc()


if __name__ == '__main__':
    main()

Agent:- move: (0 , 0) ~ move happened: set() ~ last action: 
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {2} ~ last action: up
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {2} ~ last action: up
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {1, 2} ~ last action: right
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {1, 2, 3} ~ last action: left
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {1, 2, 3} ~ last action: right
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Agent:- move: (0 , 0) ~ move happened: {1, 2, 3} ~ last action: right
[[ 6. -1. -1.  0.]
 [ 0.  0. -1. -1.]
 [-1.  0.  0.  0.]
 [-1. -1.  0.  1.]]
Ag

# Extra

In [10]:
# it will generate discrete value beween [0, 1, 2, 3]
discrete_space = spaces.Discrete(4)
discrete_space_sample = discrete_space.sample()
print(discrete_space_sample)

1


In [11]:
# it will generate grid value beween low and high
box_space = spaces.Box(low = 0, high = 10, shape=(2,), dtype=int)
box_space_sample = box_space.sample()
print(box_space_sample)

[ 5 10]


In [12]:
# it will generate dictionary
dict_space = spaces.Dict(
    {
        "agent": spaces.Discrete(4),
        "target": spaces.Box(low = 0, high = 10, shape=(2,), dtype=int),
    }
)
dict_space_sample = dict_space.sample()
print(dict_space_sample)

OrderedDict([('agent', 2), ('target', array([0, 3]))])
