### Libraries

In [2]:
import import_ipynb
from CppAlgorithms import get_state_cells, rotate_points_partial, get_motion_cells

import math
import traceback
import itertools
from heapq import heappop, heappush, heapify
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union

### Map generation
Using map with obstacles (rectangle in R^2) construct a search-graph for using A* on it

In [3]:
class Map:
    def __init__(self, n: int, echelon_points, obstacles=[]):
        """
        Initialize the map.
        
        Arguments:
        1) The number of possible directions (divides the circle into n parts)
        2) List of coordinates of the manipulator's vertices
        3) List of coordinates of obstacles
        """
        # Generate all possible angle numbers
        possible_angles = [i for i in range(n)]
        # Generate all possible combinations of angles of length n
        self.angles = {
            tuple(combination) for combination in itertools.product(possible_angles, repeat=len(echelon_points))
        }
        self.echelon_points = echelon_points
        self.obstacles = obstacles

    def get_neighbors(self, n: int, current_angles: List[float]) -> List[List[float]]:
        """
        Returns the neighboring nodes for a given node with an arbitrary number of angles.
        """
        echelons_amount = len(current_angles)
        neighbors = []
        
        # Rotate echelon points according to the current angles
        rotated_points = self.echelon_points[:]
        for echelon_number in range(echelons_amount):
            rotated_points = rotate_points_partial(
                rotated_points, echelon_number, current_angles[echelon_number] * 2 * math.pi / n
            )
        
        # Check all neighbors
        for echelon_number in range(echelons_amount):
            for delta in [1, -1]:
                new_angles = current_angles.copy()
                new_angles[echelon_number] = (new_angles[echelon_number] + delta) % n  # Normalize angle
                
                # Get intersected points for the movement
                intersected_points = get_motion_cells(rotated_points, echelon_number, delta * 2 * math.pi / n)
                
                # Convert obstacles and intersected_points to sets of tuples
                obstacles_set = {tuple(obstacle) for obstacle in self.obstacles}
                intersected_points_set = {tuple(point) for point in intersected_points}
            
                # Check if there's no collision with obstacles
                if intersected_points_set.isdisjoint(obstacles_set):
                    neighbors.append(new_angles)
        
        return neighbors


### code for A* 

In [4]:
class Node:
    def __init__(
        self,
        angles: List[int],  # List of angle indices
        g: Union[float, int] = 0,
        h: Union[float, int] = 0,
        f: Optional[Union[float, int]] = None,
        parent: "Node" = None,
    ):
        """
        Initialize the node for search.
        
        Arguments:
            angles (List[float]): List of angle indices.
            g (Union[float, int]): The cost of the path from the start node.
            h (Union[float, int]): The cost of the path to the target node.
            f (Optional[Union[float, int]]): The total cost.
            parent (Node): The parent node.
        """
        self.angles = angles
        self.g = g
        self.h = h
        self.f = self.g + h if f is None else f
        self.parent = parent

    def __eq__(self, other):
        """
        Checks if two nodes are equal.
        """
        if not isinstance(other, Node):
            return False
        return all(a == b for a, b in zip(self.angles, other.angles))
    
    def __hash__(self):
        """
        Allows the Node object to be used in sets and dictionaries.
        """
        return hash(tuple(self.angles))

    def __lt__(self, other):
        """
        Compares the f-values of two nodes, used for sorting.
        """
        return self.f < other.f


In [5]:
class SearchTreePQD:
    """
    SearchTree using a priority queue for OPEN and a dictionary for CLOSED.
    """

    def __init__(self):
        self._open = []  # Priority queue for nodes in OPEN
        self._closed = {}  # Dictionary for nodes in CLOSED (expanded nodes)
        self._enc_open_dublicates = 0  # Number of dublicates encountered in OPEN

    def __len__(self) -> int:
        """
        Returns the size of the search tree. Useful for assessing the memory
        footprint of the algorithm, especially at the final iteration.
        """
        return len(self._open) + len(self._closed)

    def open_is_empty(self) -> bool:
        """
        Checks if OPEN is empty.
        If true, the main search loop should be interrupted.
        """
        return True if len(self._open) == 0 else False

    def add_to_open(self, item: Node):
        '''
        Adds a node to the search tree, specifically to OPEN. This node is either
        entirely new or a duplicate of an existing node in OPEN.
        This implementation detects duplicates lazily; thus, nodes are added to
        OPEN without initial duplicate checks.

        Using detection like this makes the process too long
        
        for element in self._open:
            if element == item:  
                if element.g > item.g:
                    element.g, element.f, element.parent = item.g, item.f, item.parent
                    heapify(self._open)
                return
        '''
        heappush(self._open, item)

    def get_best_node_from_open(self) -> Optional[Node]:
        """
        Retrieves the best node from OPEN, defined by the minimum key.
        This node will then be expanded in the main search loop.

        Duplicates are managed here. If a node has been expanded previously
        (and is in CLOSED), it's skipped and the next best node is considered.

        Returns None if OPEN is empty.
        """
        if self.open_is_empty():
            return None
        goal_node = heappop(self._open)
        while self.was_expanded(goal_node):
            self._enc_open_dublicates += 1
            if self.open_is_empty():
                return None
            goal_node = heappop(self._open)
        return goal_node


    def add_to_closed(self, item: Node):
        """
        Adds a node to the CLOSED dictionary.
        """
        self._closed[item] = True

    def was_expanded(self, item: Node) -> bool:
        """
        Checks if a node has been previously expanded.
        """
        return self._closed.get(item, False)

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        #return self._closed.values()
        return list(self._closed.keys())

    @property
    def number_of_open_duplicates(self):
        return self._enc_open_duplicates

In [6]:
def manhattan_distance(n, angles1, angles2):
    """
    Computes the Manhattan distance between two nodes.

    Arguments
    ----------
    n : int
        The number of possible directions (divides the circle into n parts)
    angles1 : List[float]
        List of angles for the first node.
    angles2 : List[float]
        List of angles for the second node.
    
    Returns
    -------
    float
        The Manhattan distance between the nodes.
    """
    
    diffs = [
        abs(a - b) % n
        for a, b in zip(angles1, angles2)
    ]
    # Normalize the differences to account for the shortest path through boundaries
    normalized_diffs = [min(diff, n - diff) for diff in diffs]
    return sum(d for d in normalized_diffs)


In [7]:
def compute_cost(n, start_angles, goal_angles):
    """
    Computes the cost of transitioning between two nodes.

    Parameters
    ----------
    n : int
        The number of possible directions (divides the circle into n parts).
    start_angles : List[float]
        List of angles for the start node.
    goal_angles : List[float]
        List of angles for the goal node.

    Returns
    -------
    int
        The cost of transitioning between the nodes.

    Raises
    ------
    ValueError
        If the transition is not possible or does not satisfy the condition of changing exactly one angle.
    """
    
    # Check how many angles changed exactly by 1
    diffs = [
        abs(a - b) % n
        for a, b in zip(start_angles, goal_angles)
    ]
    normalized_diffs = [min(diff, n - diff) for diff in diffs]
    step_count = sum(d for d in normalized_diffs)
    if step_count == 1:
        return 1
    else:
        raise ValueError(
            f"Invalid transition: Exactly one angle must differ."
        )


In [17]:
def make_path(goal: Node) -> Tuple[List[Node], Union[float, int]]:
    """
    Creates a path by tracing parent pointers from the goal node to the start node.
    It also returns the path's length.

    Parameters
    ----------
    goal : Node
        Pointer to the goal node in the search tree.

    Returns
    -------
    Tuple[List[Node], float]
        Path and its length.
    """
    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length

In [18]:
def astar(
    n: int,
    task_graph: Map,
    start_angles: List[int],
    goal_angles: List[int],
    heuristic_func: Callable,
    search_tree: Type["SearchTreePQD"],
    precision: int = 6,
) -> Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]:
    """
    A* algorithm implementation for pathfinding.

    Parameters
    ----------
    n : int
        The number of directions (divides the circle into n parts).
    task_graph : Map
        The task graph containing nodes and neighbors.
    start_angles : List[float]
        List of start angles.
    goal_angles : List[float]
        List of goal angles.
    heuristic_func : Callable
        Heuristic function for calculating the distance to the goal.
    search_tree : Type[SearchTreePQD]
        Data structure for the open and closed lists.

    Returns
    -------
    Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]
        The result of the algorithm execution, including success, the found node,
        the number of steps, the size of the tree, and lists of opened/expanded nodes.
    """
    ast = search_tree()
    steps = 0
    
    # Create the start node
    start_node = Node(
        angles=start_angles,
        g=0,
        h=heuristic_func(n, start_angles, goal_angles),
    )
    # Create the goal node
    goal_node = Node(angles=goal_angles, g=0, h=0)  # h=0 because it's the goal node
    
    # Add the start node to the open list
    ast.add_to_open(start_node)

    while not ast.open_is_empty():
        curr_node = ast.get_best_node_from_open()
        if curr_node is None:
            break
        
        # Check if we reached the goal
        if goal_node == curr_node:
            return True, curr_node, steps + 1, len(ast), ast.opened, ast.expanded

        ast.add_to_closed(curr_node)
        
        # Generate neighbors of the current node
        for neighbor_angles in task_graph.get_neighbors(n, curr_node.angles):
            expansion_cost = compute_cost(n, curr_node.angles, neighbor_angles)
            next_node = Node(
                angles=neighbor_angles,
                g=curr_node.g + expansion_cost,
                h=heuristic_func(n, neighbor_angles, goal_angles),
                parent=curr_node,
            )
            if ast.was_expanded(next_node):
                continue
            ast.add_to_open(next_node)
        
        steps += 1

    return False, None, steps + 1, len(ast), None, ast.expanded
