In [1]:
# main import
import gymnasium as gym
import numpy as np

from gymnasium.spaces import Box, Discrete

from gymnasium import spaces
from gymnasium.utils import seeding

import math


import copy
import logging



"""
from ..utils.action_space import MultiAgentActionSpace
from ..utils.draw import draw_grid, fill_cell, write_cell_text
from ..utils.observation_space import MultiAgentObservationSpace 
"""

logger = logging.getLogger(__name__)

Handling Multi Agent Action Space

In [2]:
class MultiAgentActionSpace(list):
    def __init__(self, agents_action_space):
        # * checking if all the instances that go through here are of same classes or not
        for x in agents_action_space:
            assert isinstance(x, gym.spaces.space.Space)

        super(MultiAgentActionSpace, self).__init__(agents_action_space)
        self._agents_action_space = agents_action_space

    def sample(self):
        """samples action for each agents from uniform distribution"""
        return [agents_action_space.sample() for agents_action_space in self._agents_action_space] 

Handling Multi Agent Obsercations

In [3]:
class MultiAgentObservationSpace(list):
    def __init__(self, agent_observation_space):
        # * checking if all the instances that go through here are of same classes or not
        for x in agent_observation_space:
            assert isinstance(x. gym.spaces.space.Space)

        super().__init__(agent_observation_space)
        self._agents_observation_space = agent_observation_space

    def sample(self):
        return [agent_observation_space.sample() for agent_observation_space in self._agents_observation_space]

    def contains(self, obs):
        """contains the obsercation of all the agents"""
        for space, obs in zip(self._agents_observation_space, obs):
            if not space.contains(obs):
                return False
            else:
                return True

In [2]:

class Agent:
    def __init__(self, agent_name, agent_index):
        # identity
        self.index = agent_index
        self.agent = agent_name

        # additional attributes
        self.health = None
        self.isHit = False
        self.move = True
        self.movement_speed = 1.00

        # positional attributes
        self.previous_position = np.array([0, 0], dtype=np.float32)
        self.current_position = None
        self.same_position = False

        self.current_step = 0
        self.action = None

        # these are for the angular motion of the agent
        self.angle = 0
        self.center = 0
        self.direction = 0
        self.direction_end = 0

        # this is custom only for the render function
        self.draw_direction_end = 0

    # for handling what the action does
    def agent_action(self, action):
        pass

    # for handling all the initial states
    def agent_reset(self, width, height):
        padding = 30
        # updating the initial random position of the agent at 1st
        self.current_position = np.array(
            [np.random.uniform(30, width - padding), np.random.uniform(30, height - padding)],
            dtype=np.float32)

        # updating the initial orientation to 0 degree at 1st
        theta = math.radians(self.angle)
        magnitude = padding
        # this is for the trigonometry function X and Y
        dir_vec_x = magnitude * math.cos(theta)
        dir_vec_y = magnitude * math.sin(theta)
        
        # adding the direction vector to the center and get an end point for direction
        self.direction_end = np.array([self.current_position[0] + dir_vec_x, self.current_position[1] + dir_vec_y],
                                      dtype=np.float32)
        
        # this part is only for the render function
        self.draw_direction_end = (self.current_position[0] + dir_vec_x, self.current_position[1] + dir_vec_y)

    # updating the direction, line-end according to given angle when called
    def get_direction(self):
        # as render function demands an int value 
        center = (int(self.current_position[0]), int(self.current_position[1]))
        self.center = center
        
        # the X, Y angular equation
        theta = math.radians(self.angle)
        magnitude = 30
        # here is the X=cos()
        directional_vector_x = magnitude * math.cos(theta)
        # here is the Y=sin()
        directional_vector_y = magnitude * math.sin(theta)

        directional_line_end = np.array([center[0] + directional_vector_x, center[1] + directional_vector_y],
                                        dtype=np.float32)
        self.direction_end = directional_line_end
        
        direction = directional_line_end - center
        direction /= np.linalg.norm(direction)
        self.direction = direction
        self.draw_direction_end = (center[0] + directional_vector_x, center[1] + directional_vector_y)

    # for updating the states of the agent when called
    def step_update(self, action, range_x, range_y):

        # ! if used directional rotational movement
        # rotate clockwise
        if action == 0:

            self.angle += 10
            self.angle = self.angle % 360
            # self.get_direction()

        # rotate anti-clockwise
        elif action == 1:
            self.angle -= 10
            self.angle = self.angle % 360
            # self.get_direction()

        # move front
        elif action == 2:
            self.current_position = self.current_position + self.direction * self.movement_speed
            # self.get_direction()

        # move back
        elif action == 3:
            self.current_position = self.current_position - self.direction * self.movement_speed
            # self.get_direction()

        # do nothing / wait
        elif action == 4:
            pass

        self.get_direction()

        self.current_position[0] = np.clip(self.current_position[0], 0, range_x)
        self.current_position[1] = np.clip(self.current_position[1], 0, range_y)

    # this function returns all the state needed for the observations
    # ! can be changed with need for the algorithm
    def get_agent_state(self):
        
        agent_state = {
            'agent_id': self.index,
            'agent_name': self.agent,
            'agent_move_speed': self.movement_speed,
            'agent_current_position': self.current_position,
            'agent_angle': self.angle
        }

        return agent_state


In [8]:
class MultiAgentEnv(gym.Env):
    metadata = {'render_mode':['human', 'rgb_array']}

    def __init__(self, n_prey=1, n_predator=1):
        # defining the screen size (observation render size)
        self.screen_width = 400
        self.screen_height = 400

        # initiating the number of predators and preys
        self._n_prey = n_prey
        self._n_predator = n_predator

        self.n_agents = n_prey + n_predator

        # defining the action spaces for all the agents
        self.action_space = MultiAgentActionSpace(
            [spaces.Discrete(4) for _ in range(self.n_agents)]
        )

        # initiating prey agents' positions
        self.prey_pos = {_ : None for _ in range(self._n_prey)}
        self.prey_prev_pos = {_ : None for _ in range(self._n_prey)}

        # initiating predator agents' positions
        self.predator_pos = {_ : None for _ in range(self._n_predator)}
        self.predator_prev_pos = {_ : None for _ in range(self._n_predator)}

        # initiating their completion state
        self.prey_dones = [None for _ in range(self._n_prey)]
        self.predator_dones = [None for _ in range(self._n_predator)]

        self._total_episode_reward = None
        self.viewer = None
        
        # defining the observation space for all agents
        self.observation_space = MultiAgentObservationSpace(
            [spaces.Box() for _ in range(self.n_agents)]
        )

    def reset(self, seed=0):
        pass

    def step(self, action):
        pass

    def render(self):
        pass

    def close(self):
        pass
