# Safe Interval Path Planning On a Grid :: SIPP Algorithm (based on A* with idea of safe intervals)

In this project, we will consider the problem of finding the fastest safe path on a graph with a special structure, widely used in robotics and computer games — a grid. The grid consists of both free and blocked cells, and an agent can move from one free cell to another. Our grid will also have some obstacles and they will be moving. Algortihm needs to find a route that does not intrfere with obstacles at any point in our discrete time




In [28]:
import random
import traceback
from heapq import heappop, heappush
from pathlib import Path
from textwrap import dedent
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from PIL import Image, ImageDraw

# from tests.Map import Map

from bs4 import BeautifulSoup
import numpy as np
import bisect
import lxml


%matplotlib inline

In [29]:
INF = 100000000


class Interval:
    def __init__(self, is_safe=False, start_time=0, end_time=0):
        self.is_safe = is_safe
        self.start_time = start_time
        self.end_time = end_time

    def empty(self):
        return not self.is_safe and not self.start_time and not self.end_time

    def __eq__(self, other):
        return self.start_time == other.start_time and self.end_time == other.end_time

    def __str__(self):
        return "{" + str(self.is_safe) + ", " + str(self.start_time) + ", " + str(self.end_time) + "}"


class PathComponent:
    def __init__(self, x=0, y=0, time=0):
        self.x = x
        self.y = y
        self.time = time

    def __str__(self):
        return f"({self.x}, {self.y}, {self.time})"


class Obstacle:
    def __init__(self):
        self.path = []


class Map:
    def __init__(self, map_path: str):
        self.cost = 1

        with open(map_path, 'r') as xml_file:
            soup = BeautifulSoup(xml_file, "lxml")
        if soup.map is None:
            raise ValueError("ERROR: nothing in map tag")
        self.soup = soup.map
        self.map = None
        self.dynamic_obstacles = []
        self.width = 0
        self.height = 0
        self.start_i, self.start_j = 0, 0
        self.goal_i, self.goal_j = 0, 0

        self.allow_diagonal = True
        self.cut_corners = True
        self.allow_squeeze = True

        self.__get_map()

    def __get_map(self):
        data_ptrs = [int(self.soup.width.text), int(self.soup.height.text), int(self.soup.startx.text),
                     int(self.soup.starty.text), int(self.soup.finishx.text), int(self.soup.finishy.text)]
        

        for data_ptr in data_ptrs:
            if data_ptr is None or data_ptr <= 0:
                raise ValueError("ERROR: invalid value of the width, height, start of goal position")

        for i in range(2, len(data_ptrs)):
            data_ptrs[i] -= 1

        self.width, self.height, self.start_i, self.start_j, self.goal_i, self.goal_j = data_ptrs

        self.map = [[[] for j in range(self.width)] for i in range(self.height)]
        if self.soup.grid is None:
            raise ValueError("ERROR: nothing in grid tag")
        grid = self.soup.grid
        for i, row in enumerate(grid.find_all("row")):
            for j, el in enumerate(row.get_text().split()):
                el = int(el)
                if bool(el):
                    interval = Interval(is_safe=False, start_time=0, end_time=INF)
                    self.map[i][j].append(interval)

        dynamic_obstacles = self.soup.dynamicobstacles
        for obstacle in dynamic_obstacles.find_all("obstacle"):
            self.dynamic_obstacles.append(Obstacle())
            for point in obstacle.find_all("point"):
                x, y, time = int(point["x"]), int(point["y"]), int(point["time"])
                if self.dynamic_obstacles[-1].path:
                    prev_point = self.dynamic_obstacles[-1].path[-1]
                    distance = self.get_distance(x - 1, y - 1, prev_point[0], prev_point[1])

                    if ((distance == 0 and (time * self.cost - prev_point[2]) <= 0) or
                            (distance != 0 and distance != (time * self.cost - prev_point[2]))):
                        raise ValueError("ERROR: invalid path representation")

                self.dynamic_obstacles[-1].path.append((x - 1, y - 1, time * self.cost))

    def get_successors(self, node):
        successors = []
        for i in range(max(0, node.i - 1), min(node.i + 2, self.get_height())):
            for j in range(max(0, node.j - 1), min(node.j + 2, self.get_width())):
                if self.is_traversable(i, j):
                    cost = self.cost
                    if (i + j) % 2 == (node.i + node.j) % 2:
                        if not self.check_diagonal_successor(node, i, j):
                            continue
                        cost = np.sqrt(2)

                    min_time = node.g + cost / 2
                    if self.map[i][j]:
                        max_time = self.map[node.i][node.j][node.interval].end_time
                    else:
                        max_time = INF

                    if min_time >= max_time:
                        continue

                    self.set_intervals(i, j)

                    interval = (self.get_safe_interval_id(i, j, min_time))
                    successors.append((i, j, cost, interval))
                    # while interval < len(self.map[i][j]) and self.map[i][j][interval].start_time < max_time:
                    #     successor = cells2nodes[(i, j, interval)]
                    #     new_g = max(
                    #         node.g + cost,
                    #         self.map[i][j][interval].startTime + cost / 2
                    #     )
                    #
                    #     if successor is None:
                    #         successor = Node(
                    #             i,
                    #             j,
                    #             interval,
                    #             new_g,
                    #             getHeuristic(i, j, map),
                    #             node
                    #         )
                    #         cells2nodes[(i, j, interval)] = successor
                    #         OPEN.append(successor)
                    #     successors.append((successor, new_g))
                    #
                    # interval += 1

        return successors

    def check_diagonal_successor(self, node, i, j):
        if not self.allow_diagonal or (node.i == i and node.j == j):
            return False

        near_cell1 = self.is_traversable(node.i, j)
        near_cell2 = self.is_traversable(i, node.j)

        return (near_cell1 and near_cell2) or \
               (self.cut_corners and (near_cell1 or near_cell2)) or \
               (self.allow_squeeze and not near_cell1 and not near_cell2)

    def get_distance(self, i1, j1, i2, j2):
        return (abs(i2 - i1) + abs(j2 - j1)) * self.cost

    def get_cost(self):
        return self.cost

    def is_traversable(self, i, j):
        print(f"is_traversable {i} {j}")
        return not self.map[i][j] or self.map[i][j][0].is_safe

    def get_width(self):
        return self.width

    def get_height(self):
        return self.height

    def get_start_cell(self):
        return self.start_i, self.start_j

    def get_goal_cell(self):
        return self.goal_i, self.goal_j

    def point_on_segment(self, point, p1, p2) -> bool:
        point, p1, p2 = np.array(point), np.array(p1), np.array(p2)
        diff1, diff2 = p1 - point, p2 - point
        return diff1[0] * diff2[1] - diff1[1] * diff2[0] == 0 and diff1[0] * diff1[1] + diff2[0] * diff2[1] >= 0

    def set_intervals(self, i, j):
        if self.map[i][j]:
            return

        collision_intervals: list[Interval] = []

        for obstacle in self.dynamic_obstacles:
            path = obstacle.path
            print(path[0])
            for point_i in range(1, len(path)):
                if not self.point_on_segment((j, i), [path[point_i - 1][0], path[point_i - 1][1]],
                                             [path[point_i][0], path[point_i][1]]):
                    continue

                collision_intervals.append(
                    Interval(
                        False,
                        path[point_i - 1][2] + self.get_distance(i, j, path[point_i - 1][1],
                                                                   path[point_i - 1][0]) - self.cost / 2,
                        path[point_i - 1][2] + self.get_distance(i, j, path[point_i - 1][1],
                                                                   path[point_i - 1][0]) + self.cost / 2
                    )
                )

                if self.get_distance(path[point_i - 1][1], path[point_i - 1][0], path[point_i][1], path[point_i][0]) == 0:
                    collision_intervals[-1].end_time = path[point_i][2] + self.cost / 2

        collision_intervals.sort(key=lambda x: x.start_time)

        if not collision_intervals:
            self.map[i][j].append(Interval(True, 0, INF))
            return

        for interval in collision_intervals:
            if not self.map[i][j] or interval.start_time > self.map[i][j][-1].end_time:
                self.map[i][j].append(interval)

            self.map[i][j][-1].end_time = max(self.map[i][j][-1].end_time, interval.end_time)

        if self.map[i][j][0].start_time > 0:
            self.map[i][j].append(Interval(True, self.map[i][j][-1].end_time, INF))
            for interval in range(len(self.map[i][j]) - 2, -1, -1):
                self.map[i][j][interval].is_safe = True
                self.map[i][j][interval].end_tme = self.map[i][j][interval].start_time
                self.map[i][j][interval].start_time = 0 if interval == 0 else self.map[i][j][interval - 1].end_time
        else:
            for interval in range(len(self.map[i][j])):
                self.map[i][j][interval].is_safe = True
                self.map[i][j][interval].start_time = self.map[i][j][interval].end_time

                if interval + 1 == len(self.map[i][j]):
                    self.map[i][j][interval].end_time = INF
                    continue

                self.map[i][j][interval].end = self.map[i][j][interval + 1].start_time

    def get_safe_interval_id(self, i, j, time):
        def compare(time, interval):
            return time < interval.end_time

        return bisect.bisect_right(self.map[i][j], time, key=lambda x: compare(time, x)) - 1

    def get_interval_start(self, i, j, interval):
        return self.map[i][j][interval].start_time

    def get_interval_end(self, i, j, interval):
        return self.map[i][j][interval].end_time


In [30]:
class Node:
    """
    Represents a search node.

    Attributes
    ----------
    i : int
        Row coordinate of the corresponding grid element.
    j : int
        Column coordinate of the corresponding grid element.
    g : float | int
        g-value of the node.
    h : float | int
        h-value of the node
    f : float | int
        f-value of the node
    parent : Node
        Pointer to the parent node.
    """

    def __init__(
        self,
        i: int,
        j: int,
        interval: int,
        g: Union[float, int] = 0,
        h: Union[float, int] = 0,
        f: Optional[Union[float, int]] = None,
        parent: "Node" = None,
    ):
        """
        Initializes a search node.

        Parameters
        ----------
        i : int
            Row coordinate of the corresponding grid element.
        j : int
            Column coordinate of the corresponding grid element.
        g : float | int
            g-value of the node.
        h : float | int
            h-value of the node (always 0 for Dijkstra).
        f : float | int
            f-value of the node (always equal to g-value for Dijkstra).
        parent : Node
            Pointer to the parent node.
        """
        self.i = i
        self.j = j
        self.g = g
        self.h = h
        self.interval = interval
        if f is None:
            self.f = self.g + h
        else:
            self.f = f
        self.parent = parent

    def __eq__(self, other):
        """
        Checks if two search nodes are the same, which is needed to detect duplicates in the search tree.
        """
        return self.i == other.i and self.j == other.j

    def __hash__(self):
        """
        Makes the Node object hashable, allowing it to be used in sets/dictionaries.
        """
        return hash((self.i, self.j))

    def __lt__(self, other):
        """
        Compares the keys (i.e., the f-values) of two nodes, needed for sorting/extracting the best element from OPEN.
        """
        if self.f == other.f:
            return self.g > other.g
        return self.f < other.f

In [31]:
test_map = Map("../data/den101d_200_1.xml")

Computes the cost of a transition from cell `(i1, j1)` to cell `(i2, j2)`. In the case of a 4-connected grid, the cost is always equal to 1. Consequently, the cost of each path is an integer.

However, please note that if the grid's connectivity exceeds 4, the cost of a transition may vary and may become fractional or even arbitrary (e.g., $\sqrt{2}$ for diagonal moves).

Recall that in this lab, we assume a 4-connected grid, meaning that only cardinal moves (up, down, left, right) are allowed, and as a result, all costs are integers.


## Search Node

A Search node is a fundamental concept in heuristic search algorithms. It encapsulates data about the state of the problem (e.g., the position of a robot or agent on a grid) and the information needed to construct a search tree, including g-values, h-values, f-values, and a backpointer to its predecessor.


## Visualization

## Implementing the Search Tree (i.e., OPEN and CLOSED)
An efficient implementation of the search tree (`OPEN` and `CLOSED`) is crucial for any search algorithm. You should utilize the efficient search tree implementation you developed in the previous lab.

In [32]:
class SearchTreePQD:
    """
    SearchTree using a priority queue for OPEN and a dictionary for CLOSED.
    """

    def __init__(self):
        self._open = []  # Priority queue for nodes in OPEN
        self._closed = {}  # Dictionary for nodes in CLOSED (expanded nodes)
        self._enc_open_duplicates = 0  # Number of duplicates encountered in OPEN

    def __len__(self) -> int:
        """
        Returns the size of the search tree. Useful for assessing the memory
        footprint of the algorithm, especially at the final iteration.
        """
        return len(self._open) + len(self._closed)

    def open_is_empty(self) -> bool:
        """
        Checks if OPEN is empty.
        If true, the main search loop should be interrupted.
        """
        return len(self._open) == 0

    def add_to_open(self, item: Node):
        """
        Adds a node to the search tree, specifically to OPEN. This node is either
        entirely new or a duplicate of an existing node in OPEN.
        This implementation detects duplicates lazily; thus, nodes are added to
        OPEN without initial duplicate checks.
        """
        heappush(self._open, item)

    def get_best_node_from_open(self) -> Optional[Node]:
        """
        Retrieves the best node from OPEN, defined by the minimum key.
        This node will then be expanded in the main search loop.

        Duplicates are managed here. If a node has been expanded previously
        (and is in CLOSED), it's skipped and the next best node is considered.

        Returns None if OPEN is empty.
        """
        while not self.open_is_empty():
            item = heappop(self._open)
            if item not in self._closed:
                return item
            else:
                self._enc_open_duplicates += 1
        return None

    def add_to_closed(self, item: Node):
        """
        Adds a node to the CLOSED dictionary.
        """
        self._closed[item] = item

    def was_expanded(self, item: Node) -> bool:
        """
        Checks if a node has been previously expanded.
        """
        return item in self._closed.keys()

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        return self._closed.keys()

    @property
    def number_of_open_dublicates(self):
        return self._enc_open_duplicates

## Validating the Results

## A* Algorithm Without Reexpansions

The primary component of the `A*` algorithm is the heuristic function.

Ideally, this function should be admissible (it never overestimates the true cost to the goal) and consistent (satisfies the triangle inequality). The Manhattan distance serves as an admissible and consistent heuristic for 4-connected grids. Therefore, if we integrate it into the `Dijkstra` search loop from the previous lab, we'll obtain the `A*` algorithm.

Let's proceed!


The input for the `A*` algorithm implementation is the same as for `Dijkstra`:

The input is:
- map representation
- start/goal cells
- heuristic function $^*$
- a reference to the implementation of the SearchTree

The output is:
- path found flag (`true` or `false`)
- last node of the path (so one can unwind it using the parent-pointers and get the full path)
- the number of steps (iterations of the main loop)
- the number of nodes that compose the search tree at the final iteration of the algorithm (=the size of the resultant search tree)
- OPEN and CLOSED (as iterable collections of nodes) for further visualization purposes


PS: You might also want to display, at the final iteration, the number of OPEN duplicates encountered during the search, as shown below:

```print("During the search, the following number of OPEN dublicates was encountered: ", ast.number_of_open_duplicates) ```


In [33]:
def astar(
    task_map: Map,
    start_i: int,
    start_j: int,
    goal_i: int,
    goal_j: int,
    heuristic_func: Callable,
    search_tree: Type[SearchTreePQD],
) -> Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]:
    """
    Implements the A* search algorithm.

    Parameters
    ----------
    task_map : Map
        The grid or map being searched.
    start_i, start_j : int, int
        Starting coordinates.
    goal_i, goal_j : int, int
        Goal coordinates.
    heuristic_func : Callable
        Heuristic function for estimating the distance from a node to the goal.
    search_tree : Type[SearchTreePQD]
        The search tree to use.

    Returns
    -------
    Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]
        Tuple containing:
        - A boolean indicating if a path was found.
        - The last node in the found path or None.
        - Number of algorithm iterations.
        - Size of the resultant search tree.
        - OPEN set nodes for visualization or None.
        - CLOSED set nodes.
    """
    ast = search_tree()
    steps = 0

    start_node = Node(start_i, start_j, interval = 0, g=0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
    ast.add_to_open(start_node)

    while not ast.open_is_empty():
        current = ast.get_best_node_from_open()

        steps += 1

        if current is None:
            break

        if current.i == goal_i and current.j == goal_j:  # is goal
            return True, current, steps, len(ast), ast.opened, ast.expanded
        
        ast.add_to_closed(current)

        print(f"Current node {current.i}, {current.j}, interval {current.interval}")

        for i, j, cost, interval  in task_map.get_successors(current):
            print(i,j, interval)
            new = Node(i, j, interval)
            if not ast.was_expanded(new):

                new.parent = current

                new.h = heuristic_func(i, j, goal_i, goal_j)
                new.g = current.g + task_map.get_distance(current.i, current.j, i, j)
                new.f = new.g  + new.h

                ast.add_to_open(new)

    return False, None, steps, len(ast), None, ast.expanded

In [34]:
print(len(test_map.map), len(test_map.map[0]))

41 73


In [42]:
_, current, _, _, _, _ = astar(test_map, test_map.start_j, test_map.start_i, test_map.goal_j, test_map.goal_i, test_map.get_distance, SearchTreePQD)

Current node 33, 9, interval 0
is_traversable 32 8
is_traversable 33 8
is_traversable 32 9
is_traversable 32 9
is_traversable 32 10
is_traversable 33 10
is_traversable 32 9
is_traversable 33 8
is_traversable 33 9
is_traversable 33 10
is_traversable 34 8
is_traversable 33 8
is_traversable 34 9
is_traversable 34 9
is_traversable 34 10
is_traversable 33 10
is_traversable 34 9
32 8 -1
32 9 -1
32 10 -1
33 8 -1
33 10 -1
34 8 -1
34 9 -1
34 10 -1
Current node 32, 10, interval -1
is_traversable 31 9
is_traversable 32 9
is_traversable 31 10
is_traversable 31 10
is_traversable 31 11
is_traversable 32 11
is_traversable 31 10
is_traversable 32 9
is_traversable 32 10
is_traversable 32 11
is_traversable 33 9
is_traversable 32 9
is_traversable 33 10
is_traversable 33 10
is_traversable 33 11
is_traversable 32 11
is_traversable 33 10
31 9 36
31 10 36
31 11 37
32 9 37
32 11 33
33 9 35
33 10 27
33 11 30
Current node 31, 11, interval 37
is_traversable 30 10
is_traversable 31 10
is_traversable 30 11
is_trav

In [43]:
lst = [current]
while current.parent is not None:
    lst.append(current.parent)
    current = current.parent

print(len(lst))

for el in list(reversed(lst)):
    print(el.i, el.j)

70
33 9
32 10
31 11
30 12
29 13
28 14
27 15
26 16
26 17
26 18
26 19
26 20
26 21
26 22
26 23
26 24
26 25
26 26
26 27
26 28
26 29
26 30
26 31
26 32
26 33
26 34
26 35
26 36
26 37
26 38
26 39
26 40
26 41
26 42
26 43
26 44
26 45
26 46
26 47
26 48
26 49
26 50
26 51
26 52
26 53
26 54
27 55
27 56
27 57
26 58
25 59
24 60
23 61
22 62
21 63
20 64
19 65
18 66
17 67
16 68
15 69
14 69
13 69
12 69
11 69
10 69
9 69
8 69
7 69
6 69


In [36]:
print(test_map.map[33][9])

[<__main__.Interval object at 0x136d12310>, <__main__.Interval object at 0x136d42950>, <__main__.Interval object at 0x136d06d50>, <__main__.Interval object at 0x136d10410>, <__main__.Interval object at 0x136d14a10>, <__main__.Interval object at 0x136d07850>, <__main__.Interval object at 0x136d1d250>, <__main__.Interval object at 0x136d1c050>, <__main__.Interval object at 0x136d07990>, <__main__.Interval object at 0x136d13c50>, <__main__.Interval object at 0x136d0e250>, <__main__.Interval object at 0x136d05e50>, <__main__.Interval object at 0x136d06f10>, <__main__.Interval object at 0x136d10950>, <__main__.Interval object at 0x136d0f710>, <__main__.Interval object at 0x136d14490>, <__main__.Interval object at 0x136d0e9d0>, <__main__.Interval object at 0x136d15f50>, <__main__.Interval object at 0x136d06f50>, <__main__.Interval object at 0x136d0e3d0>, <__main__.Interval object at 0x136d13510>, <__main__.Interval object at 0x136d17150>, <__main__.Interval object at 0x136d0fad0>, <__main__.