# Environment Demo

In [17]:
import numpy as np
import pygame

import gymnasium as gym
from gymnasium import spaces
from math import sqrt, atan2, degrees

## Robot features

- 2.5 cm diameter
- compass
- 360 vision sensor and object reconition in range 0.5m
- comunication between others robots (0.5m range)
- ability to pick up stuff (in they're in the same position of the object)
- holonomic motion (every directions)

In [18]:
ROBOT_SIZE = 2.5
SENSOR_RANGE = 50

Since we are in a 2d gridworld environment a robot can move only in a discrete space of directions. The robot can move in 8 directions (N, NE, E, SE, S, SW, W, NW). The robot can also pick up objects and put them down.

In [19]:
STAY = 0
N = 1
NE = 2
E = 3
SE = 4
S = 5
SW = 6
W = 7
NW = 8
PICK_UP = 9
PUT_DOWN = 10

The robots have 16 sensors, so it's safe to assume that they could detect the closest objects in all 360 angle. The sensor also allows the robot to detect the distance to the object and the type of the object. So we can define the sensor as a tuple (angle, distance, object_type) and each robot would have a list of 16 of these tuples.

## Arena

5m x 5m with robots and colored objects 

In [20]:
ARENA_SIZE = 500 / ROBOT_SIZE

## Environment construction

In [606]:
class GridWorldEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    def __init__(
            self, 
            render_mode=None, 
            size=100, 
            n_agents=3, 
            n_blocks=3, 
            n_sensors = 6, 
            sensor_range=5,
            sensor_degree = 360,
            seed=None
            ):
        
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window

        self._n_agents = n_agents
        self.n_blocks = n_blocks

        self._n_sensors = n_sensors
        self._sensors_range = sensor_range
        self._sensors_degree = sensor_degree
        self._sensors_angle = self._sensors_degree / self._n_sensors
        self._sensors = np.zeros((self._n_agents, n_sensors, 3), dtype=float) # init sensors

        self._agents_locations = np.zeros((self._n_agents, 2), dtype=int)
        self._agents_picked_up = np.full(self._n_agents, -1, dtype=int)

        self._blocks_location = np.zeros((self.n_blocks, 2), dtype=int)
        self._blocks_colors = np.zeros(self.n_blocks, dtype=int)
        self._blocks_picked_up = np.full(self.n_blocks, -1, dtype=int)

        self.color_map = {
            2: "\033[91m",  # Red
            3: "\033[94m",  # Blue
            4: "\033[92m",  # Green
            5: "\033[93m",  # Yellow
            6: "\033[95m",  # Purple
            7: "\033[33m",   # Orange
            8: "\033[90m",   # Dark Gray
        }
        self.reset_color = "\033[0m"  # Resets color to default
        
        # All directions
        self._action_to_direction = {
            STAY : np.array([0, 0]),
            N : np.array([-1, 0]),
            NE : np.array([-1, 1]),
            E : np.array([0, 1]),
            SE : np.array([1, 1]),
            S : np.array([1, 0]),
            SW : np.array([1, -1]),
            W : np.array([0, -1]),
            NW : np.array([-1, -1]),
        }
        self._other_actions = [PICK_UP, PUT_DOWN]

        n_total_actions = len(self._action_to_direction) + len(self._other_actions)
        
        self.action_space = spaces.Tuple([spaces.Discrete(n_total_actions) for _ in range(self._n_agents)])

        self.observation_space = spaces.Dict(
            {
                "sensors": spaces.Box(0, 255, shape=(self._n_agents, n_sensors, 3), dtype=float),
            }
        )

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        self.window = None
        self.clock = None

    def _calculate_distance_direction(self, pointA, pointB, distance_type='euclidean'):
        """
        Calculate the distance and direction in degrees from pointA to pointB in a grid world.

        Parameters:
        - pointA: Tuple[int, int] representing the coordinates of the first point (x1, y1).
        - pointB: Tuple[int, int] representing the coordinates of the second point (x2, y2).
        - distance_type: String indicating the type of distance to calculate ('manhattan' or 'euclidean').

        Returns:
        - distance: The calculated distance between the two points.
        - direction_degrees: The direction from pointA to pointB in degrees from 0 to 360.
        """
        x1, y1 = pointA
        x2, y2 = pointB

        # Calculate distance
        if distance_type == 'manhattan':
            distance = abs(x1 - x2) + abs(y1 - y2)
        elif distance_type == 'euclidean':
            distance = sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
        else:
            raise ValueError("Invalid distance type. Use 'manhattan' or 'euclidean'.")

        # Calculate direction in degrees
        angle_radians = atan2(y2 - y1, x2 - x1)
        direction_degrees = degrees(angle_radians)
        # TODO: south is 0 degrees, east is 90 degrees, north is 180 degrees, west is -90 degrees

        return distance, direction_degrees
    
    def _get_obs(self):
        # Reset sensors
        self._sensors = np.zeros((self._n_agents, self._n_sensors, 3), dtype=float)
        
        # Mimic sensors reading
        for i in range(self._n_agents):
            
            # Check if the sensors detect other agents
            for j in range(self._n_agents):
                if i != j:
                    distance, direction = self._calculate_distance_direction(self._agents_locations[i], 
                                                                            self._agents_locations[j])
                    if distance <= self._sensors_range: # If the other agent is within the sensor range
                        sensor_index = int(direction / self._sensors_angle)
                        # The sensor captures only the closest agent
                        if (self._sensors[i, sensor_index, 2] != 0 and self._sensors[i, sensor_index, 1] > distance) or self._sensors[i, sensor_index, 2] == 0:
                            self._sensors[i, sensor_index] = [direction, distance, 1] # 1 is the identifier for an agent
            
            # Check if the sensors detect blocks
            for j in range(self.n_blocks):
                distance, direction = self._calculate_distance_direction(self._agents_locations[i], 
                                                                        self._blocks_location[j])
                if distance <= self._sensors_range: # If the block is within the sensor range
                    sensor_index = int(direction / self._sensors_angle)
                    # The sensor captures only the closest block
                    if (self._sensors[i, sensor_index, 2] != 0 and self._sensors[i, sensor_index, 1] > distance) or self._sensors[i, sensor_index, 2] == 0:
                        self._sensors[i, sensor_index] = [direction, distance, self._blocks_colors[j]]
        
        return {"sensors": self._sensors, "agents_block_picked": self._agents_picked_up}
                
    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)
        self._agents_picked_up = np.full(self._n_agents, -1, dtype=int)
        self._blocks_picked_up = np.full(self.n_blocks, -1, dtype=int)
       
        # Choose the agent's location uniformly at random
        for i in range(self._n_agents):
            # Check if the agents are not spawning in the same location
            while True:
                self._agents_locations[i] = self.np_random.integers(0, self.size, size=2, dtype=int)
                if not np.any(np.all(self._agents_locations[i] == self._agents_locations[:i], axis=1)):
                    break

        for i in range(self.n_blocks):
            # Check if the blocks are not spawning in the same location
            while True:
                self._blocks_location[i] = self.np_random.integers(0, self.size, size=2, dtype=int) 
                if not np.any(np.all(self._blocks_location[i] == self._blocks_location[:i], axis=1)):
                    break
            self._blocks_colors[i] = self.np_random.integers(2, 2 + len(self.color_map), dtype=int)
        
        observation = self._get_obs()
        # info = self._get_info()
        info = None

        return observation, info
    
    def step(self, action):
        for i in range(self._n_agents):
            
            if action[i] == PICK_UP:
                if self._agents_picked_up[i] == -1: # If the agent is not carrying a block
                    for j in range(self.n_blocks):
                        # If the agent is in the same location as the block
                        if np.array_equal(self._agents_locations[i], self._blocks_location[j]): 
                            self._blocks_location[j] = (-1,-1) # Set as picked up
                            self._agents_picked_up[i] = self._blocks_colors[j] # The agent knows the color of the block it picked up
                            self._blocks_picked_up[j] = i # The block is picked up by the agent
            
            if action[i] == PUT_DOWN:
                if self._agents_picked_up[i] != -1: # If the agent is carrying a block
                    for j in range(self.n_blocks):
                        if (self._blocks_picked_up[j] == i): # If the block is picked up by the agent
                            self._blocks_location[j] = self._agents_locations[i] # Set the block location to the agent location
                            self._agents_picked_up[i] = -1 # The agent is not carrying a block anymore
                            self._blocks_picked_up[j] = -1 # The block is not picked up by any agent anymore

            if action[i] == STAY:
                continue

            if action[i] in self._action_to_direction:
                # Map the action to the direction we walk in
                direction = self._action_to_direction[action[i]]
                
                # We use `np.clip` to make sure we don't leave the grid
                new_position = np.clip(
                    self._agents_locations[i] + direction, 0, self.size - 1
                )
                
                # Check if the new position is not occupied by another agent
                occupied_by_agent = np.any(np.all(new_position == self._agents_locations, axis=1))
                # Check if the new position is not occupied by a block while carrying one (can't pick up two blocks)
                occupied_by_block_while_carrying = np.any(np.all(new_position == self._blocks_location, axis=1)) and self._agents_picked_up[i] != -1
                if not occupied_by_agent and not occupied_by_block_while_carrying:
                    self._agents_locations[i] = new_position
        # An episode is done iff the goal is met
        # terminated = np.array_equal(self._agent_location, self._target_location)
        # reward = 1 if terminated else 0  # Binary sparse rewards
        observation = self._get_obs()

        return observation, None, None, False, None
    
    def print_env(self):
        for x in range(self.size):
            for y in range(self.size):
                flag = False
                print_symbol = ""
                # Check for blocks
                for i, block in enumerate(self._blocks_location):
                    if x == block[0] and y == block[1]:
                        
                        color_id = self._blocks_colors[i]
                        color_code = self.color_map.get(color_id, self.reset_color)
                        print_symbol += f"{color_code}O{self.reset_color}"
                        # print(f"{color_code}O{self.reset_color}", end=" ")
                        flag = True

                # Check for agents
                for i, agent in enumerate(self._agents_locations):
                    if x == agent[0] and y == agent[1]:
                        
                        if self._agents_picked_up[i] != -1:
                            color_id = self._agents_picked_up[i]
                            color_code = self.color_map.get(color_id, self.reset_color)
                            print_symbol += f"{color_code}{i}{self.reset_color}"
                        else:
                            print_symbol += f"{i}"
                        flag = True
                
                # If there are no agents or blocks, print an empty space
                if not flag:
                    print(".", end=" ")
                else:
                    print(print_symbol, end=" ")
                          
            print()
        
    def print_sensors(self):
        for i in range(self._n_agents):
            flag = False
            for j in range(self._n_sensors):
                if self._sensors[i,j,2] != 0:
                    entity = "agent"
                    if self._sensors[i,j,2] != 1:
                        entity = f"block (color: {self._sensors[i,j,2]})"
                    direction = self._sensors[i,j,0]
                    distance = self._sensors[i,j,1]
                    print(f"Agent {i} sees {entity} at {direction} degrees and {distance} distance")
                    flag = True
            if not flag:
                print(f"Agent {i} doesn't see anything")
            print()
        
    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

In [706]:
env = GridWorldEnv(render_mode='rgb_array', size=20, n_agents=5, n_blocks=5)
env.reset() # Initial state

({'sensors': array([[[   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ]],
  
         [[  26.56505118,    4.47213595,    3.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ]],
  
         [[   0.        ,    2.        ,    5.        ],
          [   0.        ,    0.        ,    0.        ],
          [ 135.        ,    2.82842712,    5.        ],
          [   0.        ,    0.        ,    0.        ],
          [   0.        ,    0.        ,    0.        ],
          [ 

In [708]:
env.print_env()

. . . . . . . . . . . . . . . . 1 . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . 0 . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . [94mO[0m . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . [90mO[0m . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . [93mO[0m . . . . . . . [95mO[0m . . . . . 
. . . . . . . . . . . . . . . . . . 4 . 
. . . . 2 . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . 3 . 
. . . . [93mO[0m . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 


In [709]:
env.print_sensors()

Agent 0 doesn't see anything

Agent 1 sees block (color: 3.0) at 26.56505117707799 degrees and 4.47213595499958 distance

Agent 2 sees block (color: 5.0) at 0.0 degrees and 2.0 distance
Agent 2 sees block (color: 5.0) at 135.0 degrees and 2.8284271247461903 distance

Agent 3 sees agent at 180.0 degrees and 2.0 distance
Agent 3 sees block (color: 6.0) at -126.86989764584402 degrees and 5.0 distance

Agent 4 sees agent at 0.0 degrees and 2.0 distance
Agent 4 sees block (color: 6.0) at -104.03624346792648 degrees and 4.123105625617661 distance



## Little Demo for actions

- STAY = 0
- N = 1
- NE = 2
- E = 3
- SE = 4
- S = 5
- SW = 6
- W = 7
- NW = 8
- PICK_UP = 9
- PUT_DOWN = 10

In [714]:
action = env.action_space.sample()
# action = [0, 9, 0, 0, 0]
next_state, _, _, _, _ = env.step(action)
env.print_env()

. . . . . . . . . . . . . . . . . 1 . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . 0 . . . . . . . . . 
. . . . . . . . . . . . . . . . . . [94mO[0m . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . [90mO[0m . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . [93mO[0m . . . . . . . [95mO[0m . . . 4 . 
. . . . . . . . . . . . . . . . . . . . 
. . . 2 . . . . . . . . . . . . . . 3 . 
. . . . . . . . . . . . . . . . . . . . 
. . . . [93mO[0m . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . 


In [715]:
env.print_sensors()

Agent 0 doesn't see anything

Agent 1 sees block (color: 3.0) at 14.036243467926479 degrees and 4.123105625617661 distance

Agent 2 sees block (color: 5.0) at 26.56505117707799 degrees and 2.23606797749979 distance
Agent 2 sees block (color: 5.0) at 123.69006752597979 degrees and 3.605551275463989 distance

Agent 3 sees agent at 180.0 degrees and 2.0 distance
Agent 3 sees block (color: 6.0) at -116.56505117707799 degrees and 4.47213595499958 distance

Agent 4 sees agent at 0.0 degrees and 2.0 distance
Agent 4 sees block (color: 6.0) at -90.0 degrees and 4.0 distance



In [716]:
next_state

{'sensors': array([[[   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ]],
 
        [[  14.03624347,    4.12310563,    3.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ]],
 
        [[  26.56505118,    2.23606798,    5.        ],
         [   0.        ,    0.        ,    0.        ],
         [ 123.69006753,    3.60555128,    5.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0.        ,    0.        ],
         [   0.        ,    0. 