In [1]:
import copy
import random
import numpy as np

The Agent Class:

In [2]:
class Agent:

  def __init__(self, start: tuple, goal: tuple, real_stage):
      self.x = start[0]
      self.y = start[1]
      self.goal = goal
      self.real_stage = real_stage
      self.explored_stage = np.full_like(real_stage, -1)
      self.explored_stage[self.x, self.y] = 0
      self.agent_view()

  def agent_view(self):
    # refreshes the explored map of the agent (sees up, down, left, right).
    if self.x > 0:
      # print(f"X: {self.x, self.y, len(self.real_stage[0]) - 1}")
      self.explored_stage[(self.x - 1, self.y)] = self.real_stage[(self.x - 1, self.y)]
    if self.x < len(self.real_stage) - 1:
      self.explored_stage[(self.x + 1, self.y)] = self.real_stage[(self.x + 1, self.y)]
    if self.y > 0:
      self.explored_stage[(self.x, self.y - 1)] = self.real_stage[(self.x, self.y - 1)]
    if self.y < len(self.real_stage[0]) - 1:
      self.explored_stage[(self.x, self.y + 1)] = self.real_stage[(self.x, self.y + 1)]
    self.explored_stage[self.explored_stage == 2] = 0

  def check_goal(self):
    if (self.x, self.y) == self.goal:
      return True
    return False

A* Algorithm:

In [3]:
class Node:
    def __init__(self, x, y, g_cost, h_cost):
        self.x = x
        self.y = y
        self.g_cost = g_cost
        self.h_cost = h_cost
        self.parent = None

    def f_cost(self):
        return self.g_cost + self.h_cost

def is_valid(x, y, grid):
    # Checks if the x and y are within grid + if no obs in (x, y).
    rows, cols = grid.shape
    return 0 <= x < rows and 0 <= y < cols and grid[x, y] <= 0

def a_star(grid, start, end):
    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]  # Right, Down, Left, Up
    rows, cols = grid.shape

    open_set = []
    closed_set = set()

    start_node = Node(start[0], start[1], 0, 0)
    open_set.append(start_node)

    while open_set:
        # Finds the lowest f_cost node -> adds it to the closed_set.
        current_node = min(open_set, key=lambda node: node.f_cost())
        open_set.remove(current_node)
        closed_set.add((current_node.x, current_node.y))

        # If current node is the target node -> returns the path.
        if (current_node.x, current_node.y) == end:
            path = []
            while current_node:
                path.insert(0, (current_node.x, current_node.y))
                current_node = current_node.parent
            return path


        for dx, dy in directions:

            # Gets the new pos:
            new_x, new_y = current_node.x + dx, current_node.y + dy
            if is_valid(new_x, new_y, grid) and (new_x, new_y) not in closed_set:
                g_cost = current_node.g_cost + 1  # g_cost (distance) from current node is 1.
                h_cost = abs(new_x - end[0]) + abs(new_y - end[1])
                new_node = Node(new_x, new_y, g_cost, h_cost)
                new_node.parent = current_node

                if new_node not in open_set:
                    open_set.append(new_node)

    return None  # No path found

Create a stage with obstacles (1) and free path (0).

In [4]:
def generate_stage(rows: int, cols: int, obs_prob = 0.2):

  # generate obstacles with obs_prob probability
  num_obstacles = int(rows * cols * obs_prob)

  stage = np.full((rows, cols), 0)

  # Set 1s at random positions for the specified percentage
  indices = np.random.choice(rows * cols, num_obstacles, replace=False)
  stage.flat[indices] = 1

  return stage

 Creates the "explored" stage, which at the start everything is not explored (-1) and put the agents there (2).

In [5]:
def generate_agents(real_stage, num_agents: int = 1):

  agents = []

  if num_agents <= 0:
    num_agents = 1

  zero_coordinates = list(zip(*np.where(real_stage == 0)))

  # Create the "explored" stage
  for _ in range(num_agents):
    if zero_coordinates:
      start = random.choice(zero_coordinates)
      zero_coordinates.remove(start)
      goal = random.choice(zero_coordinates)
      agents.append(Agent((start[0], start[1]), (goal[0], goal[1]), real_stage))
    else:
      break

  return agents

Choose the most unexplored path (max) to go to if the path finding algorithm does nothing.

In [6]:
def choose_index(weights, ind_prob=0.1):
    max_weight = max(weights)
    indices = [i for i, w in enumerate(weights) if w != max_weight and w != -1]  # Indices without the maximum weight and -1
    if indices and random.random() < ind_prob:
        return random.choice(indices)
    return weights.index(max_weight)

In [7]:
def rand_path(agent, conc_stage, ind_prob=0.1):
    row, col = agent.x, agent.y
    arr = agent.real_stage
    grid = conc_stage.copy()

    above = arr[row - 1, col] if row > 0 else None
    below = arr[row + 1, col] if row < arr.shape[0] - 1 else None
    left = arr[row, col - 1] if col > 0 else None
    right = arr[row, col + 1] if col < arr.shape[1] - 1 else None

    # print((row - 1, col), )

    directions = [above, below, left, right]

    above_count, below_count, right_count, left_count = -1, -1, -1, -1

    # print("===========")
    # print(directions)
    if directions[0] == 0: # can go above
      above_count = np.sum(grid[:row] == -1)
    if directions[1] == 0: # can go below
      below_count = np.sum(grid[row + 1:] == -1)
    if directions[2] == 0: # can go left
      left_count = np.sum(grid[:, :col] == -1)
    if directions[3] == 0: # can go right
      right_count = np.sum(grid[:, col + 1:] == -1)

    weights = [above_count, below_count, left_count, right_count]
    # print(weights)

    if all(w == -1 for w in weights):
      return (row, col), (row, col) # stays if it cannot go nowhere

    # index = directions.index(random.choices(directions, weights)[0])

    # index = weights.index(max(weights))

    index = choose_index(weights, ind_prob)

    if index == 0 and row > 0:
        new_x, new_y = row - 1, col  # above
    elif index == 1 and row < arr.shape[0] - 1:
        new_x, new_y = row + 1, col  # below
    elif index == 2 and col > 0:
        new_x, new_y = row, col - 1  # left
    elif index == 3 and col < arr.shape[1] - 1:
        new_x, new_y = row, col + 1  # right
    else:
        # Handle the case where the chosen direction is None
        new_x, new_y = row, col

    # print(new_x, new_y)

    return (row, col), (new_x, new_y)

Function to concat all agents explored stages (returns coverage percentage):

In [8]:
def update_total_explored(total_explored, agent):

  total_explored[total_explored == -1] = agent.explored_stage[total_explored == -1]

  num_minus_1 = np.sum(total_explored == -1)
  explored_percentage = 1 - (num_minus_1 / (total_explored.shape[0] * total_explored.shape[1]))

  return explored_percentage

Function for testing astar:

In [9]:
def move_astar(start_grid, start_agents, random=False, ind_prob=0.1, max_episodes=100):

  grid = copy.deepcopy(start_grid)

  agents = copy.deepcopy(start_agents)

  for agent in agents:
      grid[agent.x, agent.y] = 2  # Mark initial agent positions

  episode = 0

  total_explored = np.full(grid.shape, -1)

  total_agents = len(agents)
  num_finish_agents = 0

  total_expl_per = 0

  while any((agent.x, agent.y) != agent.goal for agent in agents) and episode < max_episodes:
      episode += 1
      agent_count = 1
      # print(num_finish_agents)
      for agent in agents:
          # agent.agent_view()  # update agent view
          if (agent.x, agent.y) == agent.goal:
              grid[agent.x, agent.y] = 0
              total_expl_per = update_total_explored(total_explored, agent)
              agents.remove(agent)
              continue  # Agent has reached its goal
          path = a_star(grid, (agent.x, agent.y), agent.goal)
          total_expl_per = update_total_explored(total_explored, agent)

          if random and path is None:
            path = rand_path(agent, total_explored, ind_prob)
          # print(f"Agent{agent_count} Path: {path}")
          # print(f"Agent{agent_count} Goal: {agent.goal}")

          agent_count += 1
          if path:
              grid[agent.x, agent.y] = 0  # Mark the old position as unoccupied
              agent.x, agent.y = path[1]  # Update agent position
              agent.agent_view()
              grid[agent.x, agent.y] = 2  # Mark the new position as occupied by agent
              if agent.check_goal():
                num_finish_agents += 1
              # print(grid)
      # print(total_explored)

  return total_expl_per, num_finish_agents / total_agents, total_explored

Manual Example:

In [10]:
grid = np.array([[0, 0, 1, 0, 0],
                 [1, 0, 0, 0, 1],
                 [1, 0, 0, 0, 1],
                 [0, 0, 1, 0, 0],
                 [0, 0, 0, 0, 0]])

print("LESS OBSTACLES STAGE")
agents = [Agent((0, 0), (4, 4), grid), Agent((0, 4), (4, 0), grid),]
res = move_astar(grid, agents, random=False)
print(f"Total No-Random Coverage Percentage {res[0]} / Agents Finished: {res[1]}")
print(f"Final Stage\n{res[2]}")
# Random
agents = [Agent((0, 0), (4, 4), grid), Agent((0, 4), (4, 0), grid),]
res = move_astar(grid, agents, random=True)
print(f"Total Random Coverage Percentage {res[0]} / Agents Finished: {res[1]}")
print(f"Final Stage\n{res[2]}")


grid = np.array([[0, 0, 1, 1, 0],
                 [0, 1, 1, 0, 1],
                 [0, 0, 1, 1, 1],
                 [0, 1, 0, 0, 0],
                 [0, 0, 1, 0, 0]])

print("MANY OBSTACLES STAGE")
agents = [Agent((0, 0), (4, 4), grid), Agent((0, 4), (4, 0), grid),]
res = move_astar(grid, agents, random=False)
print(f"Total No-Random Coverage Percentage {res[0]} / Agents Finished: {res[1]}")
print(f"Final Stage\n{res[2]}")
# Random
agents = [Agent((0, 0), (4, 4), grid), Agent((0, 4), (4, 0), grid),]
res = move_astar(grid, agents, random=True)
print(f"Total Random Coverage Percentage {res[0]} / Agents Finished: {res[1]}")
print(f"Final Stage\n{res[2]}")


LESS OBSTACLES STAGE
Total No-Random Coverage Percentage 1.0 / Agents Finished: 1.0
Final Stage
[[0 0 1 0 0]
 [1 0 0 0 1]
 [1 0 0 0 1]
 [0 0 1 0 0]
 [0 0 0 0 0]]
Total Random Coverage Percentage 1.0 / Agents Finished: 1.0
Final Stage
[[0 0 1 0 0]
 [1 0 0 0 1]
 [1 0 0 0 1]
 [0 0 1 0 0]
 [0 0 0 0 0]]
MANY OBSTACLES STAGE
Total No-Random Coverage Percentage 0.24 / Agents Finished: 0.0
Final Stage
[[ 0  0 -1  1  0]
 [ 0 -1 -1 -1  1]
 [-1 -1 -1 -1 -1]
 [-1 -1 -1 -1 -1]
 [-1 -1 -1 -1 -1]]
Total Random Coverage Percentage 0.52 / Agents Finished: 0.0
Final Stage
[[ 0  0 -1  1  0]
 [ 0  1 -1 -1  1]
 [ 0  0  1 -1 -1]
 [ 0  1 -1 -1 -1]
 [ 0 -1 -1 -1 -1]]


Testing with dynamically generated stages:

In [11]:
# Initialization parameters ==:
grid_rows, grid_columns = 10, 10
obs_prob = 0.5  # propability of objects.
num_agents = 3 # number of agents.
num_test = 20
# =============================

grid = generate_stage(grid_rows, grid_columns, obs_prob)

agents = generate_agents(real_stage = grid, num_agents = num_agents)

avg = []
avg_finish = []
for _ in range(num_test):
  res = move_astar(start_grid=grid, start_agents=agents, random=False)
  # print(res)
  avg.append(res[0])
  avg_finish.append(res[1])
print(f"Average No-Random Coverage Percentage: {sum(avg)/len(avg)} / Average Finished: {sum(avg_finish)/len(avg_finish)}")

# Random:
avg = []
avg_finish = []
for _ in range(num_test):
  res = move_astar(start_grid=grid, start_agents=agents, random=True)
  # print(res)
  avg.append(res[0])
  avg_finish.append(res[1])
print(f"Average Random Coverage Percentage: {sum(avg)/len(avg)} / Average Finished: {sum(avg_finish)/len(avg_finish)}")

Average No-Random Coverage Percentage: 0.14999999999999997 / Average Finished: 0.0
Average Random Coverage Percentage: 0.37849999999999995 / Average Finished: 0.0


**OLD VERSION** of the Agent Class:
```
class Agent:

  def __init__(self, x, y, direction, real_stage, explored_stage):
    self.x = x
    self.y = y
    self.direction = direction  # 'up', 'down', 'left', 'right'
    self.real_stage = real_stage  # contains obstacles and paths
    self.explored_stage = explored_stage  # explored stage (contains agents)

  def move_left(self):
    self.direction = "left"
    if self.y > 0:
      if self.real_stage[(self.x, self.y - 1)] or self.explored_stage[(self.x, self.y - 1)]:  # obs or other agent
        print("obs")
        self.refresh_agent_view()
        return
      self.explored_stage[(self.x, self.y)] = 0
      self.y -= 1
      self.explored_stage[(self.x, self.y)] = 2
      self.refresh_agent_view()

  def move_right(self):
    self.direction = 'right'
    if self.y < len(self.explored_stage[0]) - 1:
      if self.real_stage[(self.x, self.y + 1)] or self.explored_stage[(self.x, self.y + 1)]:  # obs or other agent
        print("obs")
        self.refresh_agent_view()
        return
      self.explored_stage[(self.x, self.y)] = 0
      self.y += 1
      self.explored_stage[(self.x, self.y)] = 2
      self.refresh_agent_view()

  def move_up(self):
    self.direction = 'up'
    if self.x > 0:
      if self.real_stage[(self.x - 1, self.y)] or self.explored_stage[(self.x - 1, self.y)]:  # obs or other agent
        print("obs")
        self.refresh_agent_view()
        return
      self.explored_stage[(self.x, self.y)] = 0
      self.x -= 1
      self.explored_stage[(self.x, self.y)] = 2
      self.refresh_agent_view()

  def move_down(self):
    self.direction = 'down'
    if self.x < len(self.explored_stage) - 1:
      if self.real_stage[(self.x + 1, self.y)] or self.explored_stage[(self.x + 1, self.y)]:  # obs or other agent
        print("obs")
        self.refresh_agent_view()
        return
      self.explored_stage[(self.x, self.y)] = 0
      self.x += 1
      self.explored_stage[(self.x, self.y)] = 2
      self.refresh_agent_view()

  def refresh_agent_view(self):
    # Generate agent view =======
    # Ex. If agent looks up -> knows both up, left and right (/w sensors).
    if self.direction == "up":
      if self.x > 0:
        if self.explored_stage[(self.x - 1, self.y)] < 0: # unexplored
          self.explored_stage[(self.x - 1, self.y)] = self.real_stage[(self.x - 1, self.y)]
      if self.y < len(self.real_stage[0]) - 1:
        if self.explored_stage[(self.x, self.y + 1)] < 0:
          self.explored_stage[(self.x, self.y + 1)] = self.real_stage[(self.x, self.y + 1)]
      if self.y > 0:
        if self.explored_stage[(self.x, self.y - 1)] < 0:
          self.explored_stage[(self.x, self.y - 1)] = self.real_stage[(self.x, self.y - 1)]

    elif self.direction == "down":
      if self.x < len(self.real_stage) - 1:
        if self.explored_stage[(self.x + 1, self.y)] < 0:
          self.explored_stage[(self.x + 1, self.y)] = self.real_stage[(self.x - 1, self.y)]
      if self.y < len(self.real_stage[0]) - 1:
        if self.explored_stage[(self.x, self.y + 1)] < 0:
          self.explored_stage[(self.x, self.y + 1)] = self.real_stage[(self.x, self.y + 1)]
      if self.y > 0:
        if self.explored_stage[(self.x, self.y - 1)] < 0:
          self.explored_stage[(self.x, self.y - 1)] = self.real_stage[(self.x, self.y - 1)]

    elif self.direction == "left":
      if self.y > 0:
        if self.explored_stage[(self.x, self.y - 1)] < 0:
          self.explored_stage[(self.x, self.y - 1)] = self.real_stage[(self.x, self.y - 1)]
      if self.x < len(self.real_stage) - 1:
        if self.explored_stage[(self.x + 1, self.y)] < 0:
          self.explored_stage[(self.x + 1, self.y)] = self.real_stage[(self.x + 1, self.y)]
      if self.x > 0:
        if self.explored_stage[(self.x - 1, self.y)] < 0:
          self.explored_stage[(self.x - 1, self.y)] = self.real_stage[(self.x - 1, self.y)]

    elif self.direction == "right":
      if self.y < len(self.real_stage[0]) - 1:
        if self.explored_stage[(self.x, self.y + 1)] < 0:
          self.explored_stage[(self.x, self.y + 1)] = self.real_stage[(self.x, self.y + 1)]
      if self.x < len(self.real_stage) - 1:
        if self.explored_stage[(self.x + 1, self.y)] < 0:
          self.explored_stage[(self.x + 1, self.y)] = self.real_stage[(self.x + 1, self.y)]
      if self.x > 0:
        if self.explored_stage[(self.x - 1, self.y)] < 0:
          self.explored_stage[(self.x - 1, self.y)] = self.real_stage[(self.x - 1, self.y)]

```


In [12]:
import numpy as np

grid = np.array([[0, 0, -1, -1, 1],
                 [0, 2, 0, -1, -1],
                 [-1, 0, -1, -1, -1],
                 [-1, -1, -1, -1, -1],
                 [-1, 0, -1, -1, -1]])

coordinates = np.where(grid == 2)
x, y = coordinates[0][0], coordinates[1][0]

# print(grid[:x]) # up
# print(grid[x + 1:]) # down

# print(grid[:, y + 1:])  # right
# print(grid[:, :y])  # left

above_count = np.sum(grid[:x] == -1)
below_count = np.sum(grid[x + 1:] == -1)
right_count = np.sum(grid[:, y + 1:] == -1)
left_count = np.sum(grid[:, :y] == -1)

print(above_count, below_count, right_count, left_count)

2 13 13 3


In [13]:
l = [0, 1, 1, 0]
print(l.index(max(l)))

1
