# Anytime heuristic search 

In this lab, we will implement one of the seminal <i>anytime</i> heuristic search algorithms – ARA*.

The idea behind an anytime algorithm is "to provide the best possible solution, given a limited computational (time) budget". In our case, we will start by finding a bounded suboptimal solution with a high suboptimality bound and then gradually decrease the bound and resume the search. The crux of the algorithm is that it does not restart the search from scratch but rather reuses the previously built search tree to (significantly) save search effort. As we will see, a "smart" reuse of the search tree notably outperforms the straightforward approach where one straightforwardly invokes one WA* search after another from scratch, while decreasing the suboptimality bound.

For details on ARA*, please refer to the <a href=https://proceedings.neurips.cc/paper/2003/file/ee8fe9093fbbb687bef15a38facc44d2-Paper.pdf>original paper by Likhachev et al.</a>

As before, we will consider a pathfinding problem on an 8-connected grid with the Octile distance as the heuristic function.

In [2]:
import math
from random import shuffle
from heapq import heappop, heappush, heapify
from random import randint
import time
import os
from sys import float_info
import numpy as np
from typing import Tuple, List, Iterable, Callable, Type, Dict, Union, Optional, Generator
import numpy.typing as npt
import traceback
from textwrap import dedent

EPS = float_info.epsilon

## Grid Map Representation

In [3]:
class Map:
    """
    Represents a square grid environment for our moving agent.

    Attributes
    ----------
    _width : int
        The number of columns in the grid.

    _height : int
        The number of rows in the grid.

    _cells : np.ndarray
        A binary matrix representing the grid where 0 represents a traversable cell, and 1 represents a blocked cell.
    """

    def __init__(self, cells: npt.NDArray):
        """
        Initializes the map using a 2D array of cells.

        Parameters
        ----------
        cells : np.ndarray
            A binary matrix representing the grid. 0 indicates a traversable cell, and 1 indicates a blocked cell.
        """
        self._width = cells.shape[1]
        self._height = cells.shape[0]
        self._cells = cells

    def in_bounds(self, i: int, j: int) -> bool:
        """
        Checks if the cell (i, j) is within the grid boundaries.

        Parameters
        ----------
        i : int
            Row number of the cell in the grid.
        j : int
            Column number of the cell in the grid.

        Returns
        ----------
        bool
            True if the cell is inside the grid, False otherwise.
        """
        return 0 <= j < self._width and 0 <= i < self._height

    def traversable(self, i: int, j: int) -> bool:
        """
        Checks if the cell (i, j) is not an obstacle.

        Parameters
        ----------
        i : int
            Row number of the cell in the grid.
        j : int
            Column number of the cell in the grid.

        Returns
        ----------
        bool
            True if the cell is traversable, False if it's blocked.
        """
        return not self._cells[i, j]

    def get_neighbors(self, i: int, j: int) -> List[Tuple[int, int]]:
        """
        Gets a list of neighboring cells as (i, j) tuples.
        Assumes that the grid is 8-connected, allowing moves in cardinal and diagonal directions.

        Parameters
        ----------
        i : int
            Row number of the cell in the grid.
        j : int
            Column number of the cell in the grid.

        Returns
        ----------
        neighbors : List[Tuple[int, int]]
            List of neighboring cells.
        """
        neighbors = []
        delta = ((0, 1), (1, 0), (0, -1), (-1, 0))
        for dx, dy in delta:
            ni, nj = i + dx, j + dy
            if self.in_bounds(ni, nj) and self.traversable(ni, nj):
                neighbors.append((ni, nj))

        delta = [(1, 1), (1, -1), (-1, -1), (-1, 1)]
        for dx, dy in delta:
            ni, nj = i + dx, j + dy
            if (
                self.in_bounds(ni, nj)
                and self.traversable(ni, nj)
                and self.traversable(i, nj)
                and self.traversable(ni, j)
            ):
                neighbors.append((ni, nj))

        return neighbors

    def get_size(self) -> Tuple[int, int]:
        """
        Returns the size of the grid in cells.

        Returns
        ----------
        (height, width) : Tuple[int, int]
            Number of rows and columns in the grid.
        """
        return self._height, self._width

In [4]:
def convert_string_to_cells(cell_str: str) -> npt.NDArray:
    """
    Converts a string representation of a grid map, with '#' or '@' for obstacles and '.' for free cells, into a binary matrix.

    Parameters
    ----------
    cell_str : str
        String containing grid map information ('#' or '@' for obstacles and '.' for free cells).

    Returns
    ----------
    cells : np.ndarray
        Binary matrix representing the grid map.
    """
    lines = cell_str.replace(" ", "").split("\n")

    cells = np.array(
        [[1 if char == "#" or char == "@" else 0 for char in line] for line in lines if line],
        dtype=np.int8,
    )
    return cells

In [5]:
def compute_cost(i1, j1, i2, j2):
    """
    Computes the cost of simple moves between cells (i1, j1) and (i2, j2).

    Parameters
    ----------
    i1 : int
        Row number of the first cell in the grid.
    j1 : int
        Column number of the first cell in the grid.
    i2 : int
        Row number of the second cell in the grid.
    j2 : int
        Column number of the second cell in the grid.

    Raises
    ----------
    ValueError
        If trying to compute the cost of a non-supported move.
    """
    d = abs(i1 - i2) + abs(j1 - j2)
    if d == 1:  # Cardinal move
        return 1
    elif d == 2:  # Diagonal move
        return math.sqrt(2)
    else:
        raise ValueError("Trying to compute the cost of a non-supported move!")

## Search Node

In [6]:
class Node:
    """
    Represents a search node.

    Attributes
    ----------
    i : int
        Row coordinate of the corresponding grid element.
    j : int
        Column coordinate of the corresponding grid element.
    g : float | int
        g-value of the node.
    h : float | int
        h-value of the node
    f : float | int
        f-value of the node
    parent : Node
        Pointer to the parent node.
    w : float
        Weight value from the heuristic
    """

    def __init__(
        self,
        i: int,
        j: int,
        g: Union[float, int] = 0,
        h: Union[float, int] = 0,
        f: Optional[Union[float, int]] = None,
        parent: "Node" = None,
        w: Optional[float] = 1
    ):
        """
        Initializes a search node.

        Parameters
        ----------
        i : int
            Row coordinate of the corresponding grid element.
        j : int
            Column coordinate of the corresponding grid element.
        g : float | int
            g-value of the node.
        h : float | int
            h-value of the node (always 0 for Dijkstra).
        f : float | int
            f-value of the node (always equal to g-value for Dijkstra).
        parent : Node
            Pointer to the parent node.
        w : float
            Weight value from the heuristic (=1 if not provided)
        """
        self.i = i
        self.j = j
        self.g = g
        self.h = h
        self.w = w
        if f is None:
            self.f = self.g + self.w * self.h
        else:
            self.f = f
        self.parent = parent

    def update_weight(self, new_weight: float):
        """
        Changes the weight value of node and recaclulate it's f-value.
        """
        self.w = new_weight
        self.f = self.g + self.w * self.h

    def __eq__(self, other):
        """
        Checks if two search nodes are the same, which is needed to detect duplicates in the search tree.
        """
        return (self.i, self.j) == (other.i, other.j)

    def __hash__(self):
        """
        Makes the Node object hashable, allowing it to be used in sets/dictionaries.
        """
        return hash((self.i, self.j))

    def __lt__(self, other):
        """
        Compares the keys (i.e., the f-values) of two nodes, needed for sorting/extracting the best element from OPEN.
        """
        return self.f < other.f

## Implementing the Search Tree (i.e. OPEN,  CLOSED, INCONS)

Efficient implementation of the search tree (OPEN, CLOSED) is crucial for any search algorithm. You should use your efficient implementation of the search tree that you've used in the previous labs. 

It is also necessary to implement several new features and methods of the search tree class, which are required for ARA*.
 

In [7]:
class SearchTreePQD:
    """
    SearchTree using a priority queue for OPEN and a dictionary for CLOSED.
    """

    def __init__(self):
        self._open = []  # Priority queue for nodes in OPEN
        self._closed = dict()  # Dictionary for nodes in CLOSED (expanded nodes)
        self._incons = set()  # Set of the inconsistnet nodes
        self._visited = dict()  # Dict of nodes (e.g. key = (i, j), value = node).
        # This is needed to obtain a g/f value of an arbitrary node in the search tree
        self._enc_open_dublicates = 0  # Number of dublicates encountered in OPEN
        self.min_g_plus_h_value = np.inf

    def __len__(self) -> int:
        """
        Returns the size of the search tree. Useful for assessing the memory
        footprint of the algorithm, especially at the final iteration.
        """
        return len(self._open) + len(self._closed) + len(self._incons)

    def open_is_empty(self) -> bool:
        """
        Checks if OPEN is empty.
        If true, the main search loop should be interrupted.
        """
        return len(self._open) == 0

    def add_to_open(self, item: Node):
        """
        Adds a node to the search tree, specifically to OPEN. This node is either
        entirely new or a duplicate of an existing node in OPEN.
        This implementation detects duplicates lazily; thus, nodes are added to
        OPEN without initial duplicate checks.
        """
        heappush(self._open, item)

    def add_to_visited(self, item: Node):
        """
        Adds a node to the VISITED dictionary after the first visit.
        """
        if not (item.i,item.j) in self._visited:
            self._visited[(item.i,item.j)] = item

    def get_best_node_from_open(self) -> Optional[Node]:
        """
        Retrieves the best node from OPEN, defined by the minimum key.
        This node will then be expanded in the main search loop.

        Duplicates are managed here. If a node has been expanded previously
        (and is in CLOSED), it's skipped and the next best node is considered.

        Returns None if OPEN is empty.
        """
        while True:
            if not self._open:
                return None
            
            best_node = heappop(self._open)
            
            if self.was_expanded(best_node): # node was expanded
                self._enc_open_dublicates += 1
                continue

            return best_node

    def add_to_closed(self, item: Node):
        """
        Adds a node to the CLOSED dictionary.
        """
        self._closed[item] = item

    def was_expanded(self, item: Node) -> bool:
        """
        Checks if a node has been previously expanded.
        """
        return item in self._closed

    def add_to_incons(self, item):
        """
        This is required solely for ARA*.
        If we generate a node that has already been expanded, BUT the newly generated node has a superior g-value,
        we should not add this generated node to INCONS. While this node won't be utilized to continue expanding
        the search tree during the current iteration of the main search loop, it will serve as the frontier node
        in the next iteration of ARA*'s main search loop.
        """
        self._incons.add(item)

    def unite_update_open_incons(self, new_weight):
        """
        This is required solely for ARA*.
        It combines the OPEN and INCONS sets and updates the f-values of nodes based on the new suboptimality bound.
        """
        new_open = []
        for node in self._open + list(self._incons):
            node.update_weight(new_weight)
            new_open.append(node)
            self.min_g_plus_h_value = min(node.g + node.h, self.min_g_plus_h_value)
            
        self._open = new_open

    def clear_closed(self):
        """
        This is required solely for ARA*.
        'Closed' should be refreshed before the conclusion of each main search loop iteration.
        The 'closed part' of the search tree will be retained in memory between the main search loop iterations
        within the VISITED container.
        """
        self._closed.clear()

    def clear_incons(self):
        """
        This is required solely for ARA*.
        'Incons' should be refreshed before the conclusion of each main search loop iteration.
        """
        self._incons.clear()

    def get_node(self, i: int, j: int) -> Node:
        """
        Checks if the node was generated (is part of the search tree). If it was, then this node is returned,
        specifically — the version of this node (since there might be duplicates) with the best g-value.
        If it wasn't generated, a 'bulk' node with an 'infinite' g-value is returned.
        """
        return self._visited[(i,j)] if (i,j) in self._visited else Node(i, j, g=np.inf)

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        return self._closed

    @property
    def number_of_open_dublicates(self):
        return self._enc_open_dublicates

## Weighted A*

In [8]:
def diagonal_distance(i1, j1, i2, j2):
    """
    Computes the Diagonal distance between two cells on a grid.

    Parameters
    ----------
    i1, j1 : int
        (i, j) coordinates of the first cell on the grid.
    i2, j2 : int
        (i, j) coordinates of the second cell on the grid.

    Returns
    -------
    int
        Diagonal distance between the two cells.
    """
    dx = abs(i1 - i2)
    dy = abs(j1 - j2)
    card_cost = 1
    diag_cost = math.sqrt(2)
    return card_cost * max(dx, dy) + (diag_cost - card_cost) * min(dx, dy)

In [9]:
def weighted_astar(
    task_map: Map,
    start_i: int,
    start_j: int,
    goal_i: int,
    goal_j: int,
    weight: float,
    heuristic_func: Callable,
    search_tree: Type[SearchTreePQD],
) -> Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]:
    """
    Implements the Weighted A* search algorithm.

    Parameters
    ----------
    task_map : Map
        The grid or map being searched.
    start_i, start_j : int, int
        Starting coordinates.
    goal_i, goal_j : int, int
        Goal coordinates.
    weight : float
        The weight of heuristics in F-value computation.
    heuristic_func : Callable
        Heuristic function for estimating the distance from a node to the goal.
    search_tree : Type[SearchTreePQD]
        The search tree to use.

    Returns
    -------
    Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]
        Tuple containing:
        - A boolean indicating if a path was found.
        - The last node in the found path or None.
        - Number of algorithm iterations.
        - Size of the resultant search tree.
        - OPEN set nodes for visualization or None.
        - CLOSED set nodes.
    """
    
    ast = search_tree()
    steps = 0

    start_node = Node(start_i, start_j, g=0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
    ast.add_to_open(start_node)

    while not ast.open_is_empty():
        steps += 1
        current_node = ast.get_best_node_from_open()

        # in lazy impementation we may expand all nodes, but duplicates are still exists
        if current_node is None:
            break

        if (current_node.i, current_node.j) == (goal_i, goal_j):
            return True, current_node, steps, len(ast), ast.opened, ast.expanded
        
        for neighbor_i, neighbor_j in task_map.get_neighbors(current_node.i, current_node.j):
            neighbor_g = current_node.g + compute_cost(current_node.i, current_node.j, neighbor_i, neighbor_j)
            neighbor_h = heuristic_func(neighbor_i, neighbor_j, goal_i, goal_j)  
            neighbor_node = Node(neighbor_i, neighbor_j, g=neighbor_g, h=neighbor_h, w=weight, parent=current_node)
            
            if not ast.was_expanded(neighbor_node):
                ast.add_to_open(neighbor_node)
        
        ast.add_to_closed(current_node)
  
    return False, None, steps, len(ast), None, ast.expanded

## Naive Anytime A*

Straighforward implementation of anytime A* which sequientially invocates WA* (from scratch, without saving the search tree in between) with the decreasing suboptimality bound.

The initial suboptimality bound and the increment are the input parameters of the algorithm (start_w, step_w, respectively).


In [10]:
def naive_anytime_astar(
    task_map: Map,
    start_i: int,
    start_j: int,
    goal_i: int,
    goal_j: int,
    start_w: float = 3.0,
    step_w: float = 0.5,
    heuristic_func: Optional[Callable] = None,
    search_tree=None,
) -> Generator[Tuple[bool, Node, int, float], None, None]:
    """
    Repeatedly runs weighted A* search algorithm without re-expansion on any domain,
    decreasing current weight from start_w to 1.0 by step_w.

    Parameters
    ----------
    grid_map : Map
        The grid or map being searched.
    start_i, start_j : int, int
        Starting coordinates.
    goal_i, goal_j  : int, int
        The goal state of search in useful for your implementation form.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0, by default 3.0.
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm, by default 0.5.
    heuristic_func : Callable | None
        Heuristic function for estimating the distance from a node to the goal.
    search_tree : Type[SearchTreePQD]
        The search tree to use.

    Yields
    -------
    path_found : bool
        Path was found or not.
    last_node : Node
        The last node in path. None if path was not found.
    steps : int
        The number of search steps
    weight : float
        Weight used at iteration
    """

    if start_w < 1:
        raise Exception("In ANYA* initial weight must be greater or equal to 1")

    weight = start_w
    steps = 0

    # WA* is complete algoruthm, so we don't need to check the 'weight >= 1' condition 
    while True:
        path_found, last_node, iter_steps, _, _, _ = weighted_astar(task_map, start_i, start_j, goal_i, goal_j, weight, heuristic_func, search_tree)
        yield path_found, last_node, iter_steps, weight 
        
        steps += iter_steps
        weight -= step_w

## Anytime Repairing A*

Anytime Repairing A* is a sophisticated variant of anytime A*. It sequentially invokes WA* with a decreasing suboptimality bound while preserving the search tree in between. Each new search begins with a specially formed frontier, which is composed of nodes that were either OPEN or INCONS at the conclusion of the previous iteration (CLOSED is cleared between the main search loops).

The initial suboptimality bound and the increment are the input parameters of the algorithm (start_w and step_w, respectively).

Following the notation of the original ARA* paper, the main search loop is encapsulated in a function named improve_path. This function represents a tailored adaptation of the WA* main search loop, with a particular focus on the handling of INCONS nodes.

The function anytime_repairing_astar implements the overarching logic of the algorithm. It establishes the search tree initially, calls improve_path, merges OPEN and INCONS (and then clears both OPEN and CLOSED) post the conclusion of improve_path, and subsequently reduces the suboptimality bound, among other operations.

In [11]:
def improve_path(goal_i, goal_j, grid_map, heuristic_func, search_tree: SearchTreePQD, weight
    ) -> Tuple[bool, Optional[Node], int]:
    steps = 0
    while not search_tree.open_is_empty():
        steps += 1
        current_node = search_tree.get_best_node_from_open()

        # in lazy impementation we may expand all nodes, but duplicates are still exists
        if current_node is None:
            break

        if (current_node.i, current_node.j) == (goal_i, goal_j):
            return True, current_node, steps
        
        if current_node > search_tree.get_node(goal_i, goal_j):
            return False, None, steps
        
        for neighbor_i, neighbor_j in grid_map.get_neighbors(current_node.i, current_node.j):
            neighbor_g = current_node.g + compute_cost(current_node.i, current_node.j, neighbor_i, neighbor_j)
            neighbor_h = heuristic_func(neighbor_i, neighbor_j, goal_i, goal_j)  
            neighbor_node = Node(neighbor_i, neighbor_j, g=neighbor_g, h=neighbor_h, w=weight, parent=current_node)
            
            if search_tree.get_node(neighbor_i, neighbor_g).f == np.inf: # node was not visited
                if not search_tree.was_expanded(neighbor_node):
                    search_tree.add_to_open(neighbor_node)
                else:
                    search_tree.add_to_incons(neighbor_node)    

        search_tree.add_to_closed(current_node)

    return False, None, steps  # path_found, goal_node, iter_steps


def anytime_repairing_astar(
    task_map: Map,
    start_i: int,
    start_j: int,
    goal_i: int,
    goal_j: int,
    start_w: float = 3.0,
    step_w: float = 0.5,
    heuristic_func: Optional[Callable] = None,
    search_tree: Optional[SearchTreePQD] = None,
) -> Generator[Tuple[bool, Node, int, float], None, None]:
    """
    Runs Anytime Repairing A* search algorithm.

    Parameters
    ----------
    grid_map : Map
        The grid or map being searched.
    start_i, start_j : int, int
        Starting coordinates.
    goal_i, goal_j  : int, int
        The goal state of search in useful for your implementation form.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0, by default 3.0.
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm, by default 0.5.
    heuristic_func : Callable | None
        Heuristic function for estimating the distance from a node to the goal.
    search_tree : Type[SearchTreePQD]
        The search tree to use.

    Yields
    -------
    path_found : bool
        Path was found or not.
    last_node : Node
        The last node in path. None if path was not found.
    steps : int
        The number of search steps
    weight : float
        Weight used at iteration
    """

    if start_w < 1:
        raise Exception("In ARA* initial weight must be greater or equal to 1")

    weight = float(start_w)

    start_node = Node(start_i, start_j, g=0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
    arast = search_tree()
    arast.add_to_open(start_node)
    
    while True:
        path_found, goal_node, iter_steps = improve_path(goal_i, goal_j, task_map, heuristic_func, arast, weight)
        yield path_found, goal_node, iter_steps, weight

        weight -= step_w
        arast.unite_update_open_incons(weight)
        arast.clear_closed()
        arast.clear_incons()
        weight = min(weight, goal_node.g/arast.min_g_plus_h_value)

# Experiment

In [12]:
def make_path(goal: Node) -> Tuple[List[Node], Union[float, int]]:
    """
    Creates a path by tracing parent pointers from the goal node to the start node.
    It also returns the path's length.

    Parameters
    ----------
    goal : Node
        Pointer to the goal node in the search tree.

    Returns
    -------
    Tuple[List[Node], float]
        Path and its length.
    """
    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length

In [13]:
def simple_test(search_generator, task: Optional[int], start_w: float, step_w: float, max_time: float, *args):
    """
    Function `simple_test` runs search_generator on one task (use a number from 0 to 4 to
    choose a specific debug task on simple map or None to select a random task
    from this pool).

    Parameters
    ----------
    search_generator : Generator
        Implementation of the anytime search method.
    task : int | None
        A number from 0 to 4 to choose a specific debug task on a simple map,
        or None to select a random task from this pool.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm
    max_time : float
        The amount of time given for the pathfinding procedure. Set in seconds.
    *args
        Additional arguments passed to the search generator.
    """

    def get_map_data():
        map_str = dedent(
            """
            . . . # # . . . . . . . . # # . . . # . . # # . . . . . . .  
            . . . # # # # # . . # . . # # . . . . . . # # . . . . . . . 
            . . . . . . . # . . # . . # # . . . # . . # # . . . . . . . 
            . . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
            . . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
            . . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
            . . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
            . . . . . . . # . . # . . # # . . . # . . # . . . . . . . . 
            . . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
            . . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
            . . . # # . . . . . # . . . . . . . # . . . . . . . . . . . 
            . . . # # # # # # # # # # # # # . # # . # # # # # # # . # # 
            . . . # # . . . . . . . . # # . . . . . . . . . . . . . . . 
            . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
            . . . # # . . . . . . . . # # . . . . . . . . . . . . . . .
            """
        )
        cells = convert_string_to_cells(map_str)
        return Map(cells)

    task_map = get_map_data()
    starts = [(0, 1), (6, 2), (5, 6), (13, 7), (4, 23)]
    goals = [(1, 28), (2, 29), (3, 20), (3, 20), (0, 0)]

    if (task is None) or not (0 <= task < 5):
        task = randint(0, 4)

    start = Node(*starts[task])
    goal = Node(*goals[task])

    true_result = weighted_astar(task_map, start.i, start.j, goal.i, goal.j, 1, *args)
    path = make_path(true_result[1])
    length = path[1]
    print("Optimal path was found by A*! Length: {:.7f}".format(path[1]))

    start_time = time.time()
    last_result = False, None

    iter_count = 0
    timer = 0.0
    finale_timer = 0.0
    weight_real = 0.0
    for result in search_generator(task_map, start.i, start.j, goal.i, goal.j, start_w, step_w, *args):
        iter_count += 1
        timer = time.time() - start_time

        if timer >= max_time:
            break

        last_result = result
        finale_timer = timer
        if last_result[0]:
            weighted_length = last_result[1].g
            weight_real = weighted_length / length
            weight_used = last_result[-1]
            print(f"{iter_count} iter (w = {weight_used:.3f}). Real sub-optimality value: {weight_real:.3f}")

            if abs(weight_real - 1) < EPS:
                break
        else:
            print(f"{iter_count} iter. Weighted path not found!")

    if last_result[0]:
        weighted_length = last_result[1].g
        weight_used = weighted_length / length
        print(
            f"Weighted path found! Final sub-optimality value: {weight_used:.7f}. Iterations: {iter_count}. Time: {finale_timer:.7}"
        )
    else:
        print("Weighted path not found!")

In [14]:
def read_map_from_movingai_file(
    path: str,
) -> Tuple[npt.NDArray, int, int, int, int, float]:
    """
    Reads map from a MovingAI .map file.

    Parameters
    ----------
    path : str
        Path to a file with the map.

    Returns
    -------
    cells : npt.NDArray
        Matrix of grid map cells.
    """
    with open(path) as map_file:
        next(map_file)
        height = int(next(map_file).split()[1])
        width = int(next(map_file).split()[1])
        next(map_file)

        # Read the map section
        map_lines = [next(map_file) for _ in range(height)]
        map_str = "".join(map_lines)
        cells = convert_string_to_cells(map_str)
    return cells

In [15]:
def read_tasks_from_movingai_file(path):
    """
    Reads tasks from MovingAI .scen file

    Parameters
    ----------
    path : str
        Path to file with task

    Returns
    -------
    List[Tuple[int, int, int, int, float]]
        List of tasks. Each element of list contains:
        - start_i, start_j -- coordinates of start cell
        - goal_i, goal_j -- coordinates of goal cell
        - length -- length of shortest path
    """
    tasks = []
    with open(path) as tasks_file:
        next(tasks_file)
        for task_line in tasks_file:
            task_data = task_line.split()
            start_j = int(task_data[4])
            start_i = int(task_data[5])
            goal_j = int(task_data[6])
            goal_i = int(task_data[7])
            length = float(task_data[8])
            tasks.append((start_i, start_j, goal_i, goal_j, length))
    return tasks

In [16]:
def massive_test(search_generator: Generator, start_w: float, step_w: float, max_time: float, *args):
    """
    The `massive_test` function runs the `search_generator` on a set of different tasks. For every task, it displays a short report:
     - 'Weighted path found!' along with some statistics if a path was found.
     - 'Weighted path not found!' if a path wasn't found.
     - 'Execution error' if an error occurred during the execution of the search_function.

    Parameters
    ----------
    search_generator : generator
        Implementation of the anytime search method.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm
    max_time : float
        The amount of time given for the pathfinding procedure. Set in seconds.
    *args
        Additional arguments passed to the search function.
    Returns
    -------
    stat : Dict[List[bool], List[bool], List[float], List[float], List[float]]
        Statistics about conducted test represented in form of dictionary with next key-values:
        - found -- list containing the result of finding the path in corresponding tasks
        - correct -- list containing the results of checking the coincidence of found path length and the true value of the length in corresponding tasks
        - length -- list containing lengths of found paths in corresponding tasks
        - weight_real -- list containing the ratios of the found paths lengths to the values of the shortest paths lengths
        - weight_used -- list of the last used weights
        - iter -- list containing the number of iterations
    """

    stat = {
        "found": [],
        "correct": [],
        "length": [],
        "weight_real": [],
        "weight_used": [],
        "iter": [],
    }

    task_from = 850
    task_to = 900
    directory = "data"
    maps = ["32room_000.map"]

    for map_file in maps:
        map_file_path = os.path.join(directory, map_file)
        cells = read_map_from_movingai_file(map_file_path)
        task_file_path = map_file_path + ".scen"
        tasks = read_tasks_from_movingai_file(task_file_path)
        task_map = Map(cells)

        for task_n in range(task_from, task_to):
            (start_i, start_j, goal_i, goal_j, true_length) = tasks[task_n]

            start_time = time.time()
            last_result = False, None

            iter_count = 0
            timer = 0.0
            finale_timer = 0.0
            weight_real = 0.0

            for result in search_generator(task_map, start_i, start_j, goal_i, goal_j, start_w, step_w, *args):
                iter_count += 1
                timer = time.time() - start_time

                if timer >= max_time:
                    iter_count -= 1
                    break

                last_result = result
                finale_timer = timer
                weight_used = last_result[-1]
                if last_result[0]:
                    weighted_length = last_result[1].g
                    weight_real = weighted_length / true_length
                    if abs(weight_real - 1.0) < EPS:
                        break
                else:
                    print(f"Weighted path not found on iteration {iter_count-1} with weight {weight_used:.3}")

            if last_result[0]:
                weighted_length = last_result[1].g
                weight_real = weighted_length / true_length
                print(
                    "Weighted path found! Final sub-optimality value: {:.7f}. Iterations: {:d}. Time: {:.7}".format(
                        weight_real, iter_count, finale_timer
                    )
                )

                stat["found"].append(True)
                stat["correct"].append(abs(weight_real - 1) < 0.00001)
                stat["length"].append(weighted_length)
                stat["weight_real"].append(weight_real)
                stat["weight_used"].append(last_result[-1])
                stat["iter"].append(iter_count)
            else:
                print("Weighted path not found!")
                stat["found"].append(False)
                stat["correct"].append(False)
                stat["length"].append(None)
                stat["weight_real"].append(None)
                stat["weight_used"].append(None)
                stat["iter"].append(1)

    return stat

# Tests

The following two cells run Naive Anytime A* and ARA* on the same 'simple' tasks and report the statistics on how many search iterations were made and what was the resultant suboptimality bound of the reported solution.

By default we start with subotimality bound of 3, decrement it by 0.1 in between each iteration of the search and allocate 0.02 seconds to report a solution.


Indeed, ARA* should generally i) complete more search iterations compared to Naive Anytime A* within the given time limit, ii) end with a better solutions.

In [17]:
for i in range(5):
    simple_test(naive_anytime_astar, i, 3.0, 0.1, 0.02, diagonal_distance, SearchTreePQD)

Optimal path was found by A*! Length: 47.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.351
2 iter (w = 2.900). Real sub-optimality value: 1.351
3 iter (w = 2.800). Real sub-optimality value: 1.351
4 iter (w = 2.700). Real sub-optimality value: 1.351
5 iter (w = 2.600). Real sub-optimality value: 1.351
Weighted path found! Final sub-optimality value: 1.3513279. Iterations: 6. Time: 0.01657939
Optimal path was found by A*! Length: 40.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.469
2 iter (w = 2.900). Real sub-optimality value: 1.469
3 iter (w = 2.800). Real sub-optimality value: 1.469
4 iter (w = 2.700). Real sub-optimality value: 1.469
5 iter (w = 2.600). Real sub-optimality value: 1.469
Weighted path found! Final sub-optimality value: 1.4687484. Iterations: 6. Time: 0.01704597
Optimal path was found by A*! Length: 38.2426407
1 iter (w = 3.000). Real sub-optimality value: 1.231
2 iter (w = 2.900). Real sub-optimality value: 1.231
3 iter (w = 2.800). Real sub-opti

In [18]:
for i in range(5):
    simple_test(anytime_repairing_astar, i, 3.0, 0.1, 0.02, diagonal_distance, SearchTreePQD)

Optimal path was found by A*! Length: 47.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.351
2 iter (w = 2.312). Real sub-optimality value: 1.381
3 iter (w = 2.212). Real sub-optimality value: 1.381
4 iter (w = 2.112). Real sub-optimality value: 1.369
5 iter (w = 2.012). Real sub-optimality value: 1.369
6 iter (w = 1.912). Real sub-optimality value: 1.381
7 iter (w = 1.812). Real sub-optimality value: 1.369
8 iter (w = 1.712). Real sub-optimality value: 1.381
9 iter (w = 1.612). Real sub-optimality value: 1.148
10 iter (w = 1.512). Real sub-optimality value: 1.165
11 iter (w = 1.412). Real sub-optimality value: 1.177
12 iter (w = 1.312). Real sub-optimality value: 1.165
Weighted path found! Final sub-optimality value: 1.1649181. Iterations: 13. Time: 0.01984739
Optimal path was found by A*! Length: 40.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.469
2 iter (w = 1.959). Real sub-optimality value: 1.503
3 iter (w = 1.859). Real sub-optimality value: 1.489
4 iter (w =

The following cell runs a more involved test, i.e. the one that uses a larger map and more instances on this map (as defined in massive_test function).

One should start with subotimality bound of 10, decrement it by 0.1 in between each iteration of the search and allocate 1.0 seconds to report a solution.

NOTE: This test takes ~2 mins to run on my laptop (2022 Asus ExpertBook equipped with Intel Core i7).

As before ARA* should generally i) complete more search iterations compared to Naive Anytime A* within the given time limit, ii) end with a better solutions. This can be verified in the last cell of the notebook.

In [19]:
%%time
wa_res = massive_test(naive_anytime_astar, 10.0, 0.1, 1.0, diagonal_distance, SearchTreePQD)

Weighted path found! Final sub-optimality value: 1.2719784. Iterations: 11. Time: 0.970674
Weighted path found! Final sub-optimality value: 1.3132091. Iterations: 26. Time: 0.9744561
Weighted path found! Final sub-optimality value: 1.1801959. Iterations: 23. Time: 0.98751
Weighted path found! Final sub-optimality value: 1.2484793. Iterations: 6. Time: 0.9688296
Weighted path found! Final sub-optimality value: 1.3702344. Iterations: 15. Time: 0.9760489
Weighted path found! Final sub-optimality value: 1.2658099. Iterations: 25. Time: 0.9943848
Weighted path found! Final sub-optimality value: 1.3704302. Iterations: 16. Time: 0.9941816
Weighted path found! Final sub-optimality value: 1.2021082. Iterations: 18. Time: 0.9645035
Weighted path found! Final sub-optimality value: 1.1946363. Iterations: 22. Time: 0.9782748
Weighted path found! Final sub-optimality value: 1.4346429. Iterations: 23. Time: 0.9938021
Weighted path found! Final sub-optimality value: 1.5132720. Iterations: 17. Time: 0.

In [20]:
%%time
ara_res = massive_test(anytime_repairing_astar, 10.0, 0.1, 1, diagonal_distance, SearchTreePQD)

Weighted path found! Final sub-optimality value: 1.0858355. Iterations: 4. Time: 0.4772215
Weighted path found! Final sub-optimality value: 1.0126425. Iterations: 6. Time: 0.982434
Weighted path found! Final sub-optimality value: 1.0211846. Iterations: 4. Time: 0.3554552
Weighted path found! Final sub-optimality value: 1.2509065. Iterations: 5. Time: 0.6986008
Weighted path found! Final sub-optimality value: 1.0327745. Iterations: 5. Time: 0.6676242
Weighted path found! Final sub-optimality value: 1.0114372. Iterations: 5. Time: 0.7480171
Weighted path found! Final sub-optimality value: 1.0075390. Iterations: 6. Time: 0.7728753
Weighted path found! Final sub-optimality value: 1.0312357. Iterations: 4. Time: 0.8793263
Weighted path found! Final sub-optimality value: 1.0024128. Iterations: 5. Time: 0.7923675
Weighted path found! Final sub-optimality value: 1.0649691. Iterations: 6. Time: 0.4454274
Weighted path found! Final sub-optimality value: 1.0098627. Iterations: 5. Time: 0.6671836


In [21]:
wa_w_real = np.array(wa_res["weight_real"])
ara_w_real = np.array(ara_res["weight_real"])

wa_w_used = np.array(wa_res["weight_used"])
ara_w_used = np.array(ara_res["weight_used"])

wa_iter = np.array(wa_res["iter"])
ara_iter = np.array(ara_res["iter"])


wa_name = "WA*"
ara_name = "ARA*"
print("Average real sub-optimality value")
print("--------------------------------------")
print(f"{wa_name:10} {wa_w_real.mean():.04} | {ara_name:10} {ara_w_real.mean():.04}")
print("--------------------------------------")
print()
print("Average last used sub-optimality value")
print("--------------------------------------")
print(f"{wa_name:10} {wa_w_used.mean():.04} | {ara_name:10} {ara_w_used.mean():.04}")
print("--------------------------------------")
print()
print("Average iteration count")
print("--------------------------------------")
print(f"{wa_name:10} {wa_iter.mean():.04} | {ara_name:10} {ara_iter.mean():.04}")
print("--------------------------------------")

Average real sub-optimality value
--------------------------------------
WA*        1.327 | ARA*       1.052
--------------------------------------

Average last used sub-optimality value
--------------------------------------
WA*        8.016 | ARA*       1.196
--------------------------------------

Average iteration count
--------------------------------------
WA*        20.84 | ARA*       4.84
--------------------------------------
