In [44]:
import math
import random
import traceback
from heapq import heappop, heappush
from pathlib import Path
from textwrap import dedent
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from PIL import Image, ImageDraw

from dataclasses import dataclass

%matplotlib inline

## Grid map representation

In [7]:
class Map:

    def __init__(self, cells: npt.NDArray):
        self._width = cells.shape[1]
        self._height = cells.shape[0]
        self._cells = cells

    def in_bounds(self, i: int, j: int) -> bool:
        return 0 <= j < self._width and 0 <= i < self._height

    def traversable(self, i: int, j: int) -> bool:
        return not self._cells[i, j]

    def get_neighbors(self, i: int, j: int) -> List[Tuple[int, int]]:
        neighbors = []
        delta_cardinal = ((0, 1), (1, 0), (0, -1), (-1, 0))
        for dx, dy in delta_cardinal:
            ni, nj = i + dx, j + dy
            if self.in_bounds(ni, nj) and self.traversable(ni, nj):
                neighbors.append((ni, nj))
        delta_diagonal = ((1, 1), (1, -1), (-1, 1), (-1, -1))
        for dx, dy in delta_diagonal:
            ni, nj = i + dx, j + dy
            if (self.in_bounds(ni, nj) 
                    and self.traversable(ni, nj) 
                    and self.traversable(i + dx, j)
                    and self.traversable(i, j + dy)
            ):
                neighbors.append((ni, nj))
        return neighbors

    def get_size(self) -> Tuple[int, int]:
        return self._height, self._width

In [9]:
def compute_cost(i1: int, j1: int, i2: int, j2: int) -> float:
    if abs(i1 - i2) + abs(j1 - j2) == 1:  # Cardinal move
        return 1
    elif abs(i1 - i2) == 1 and abs(j1 - j2) == 1:  # Diagonal move
        return math.sqrt(2)
    else:
        raise ValueError("Trying to compute the cost of a non-supported move! ONLY cardinal moves are supported.")

### Read map representations and scenarios

In [34]:
def read_map_from_file(filename: str) -> "Map":
    f = open(filename, 'r')
    lines = f.readlines()
    height = int(lines[1].split()[1])
    width = int(lines[1].split()[1])
    cells = np.array(
        [[0 if (char == "." or char == "G") else 1 for char in line.rstrip()] for line in lines[4:] if line],
        dtype=np.int8,
    )
    f.close()
    assert cells.shape == (height, width)
    return Map(cells)

In [37]:
sample_map = read_map_from_file('test_data/sample_grid/sample.map')

In [48]:
@dataclass
class TestScen:
    map_name: str
    width: int
    height: int
    start_x: int
    start_y: int
    goal_x: int
    goal_y: int
    optimal_length: float

In [46]:
def read_scen_from_file(filename: str) -> List["TestScen"]:
    f = open(filename, 'r')
    lines = f.readlines()
    cases = []
    for line in lines[1:]:
        if line:
            _, map_name, width, height, start_x, start_y, goal_x, goal_y, optimal_length = line.split()
            cases.append(TestScen(map_name, width, height, start_x, start_y, goal_x, goal_y, optimal_length))
    f.close()
    return cases

In [47]:
sample_scens = read_scen_from_file('test_data/sample_grid/sample.scen')
print(sample_scens)

[TestScen(map_name='sample.map', width='4', height='4', start_x='0', start_y='0', goal_x='3', goal_y='3', optimal_length='5.41421356')]


In [None]:
def test_scen(
        map: Map,
        scen: TestScen,
        algorithm: Callable,
        search_tree, heuristic_func
    ):
    res = algorithm(
        task_map=map,
        start_i=scen.start_x,
        start_j=scen.start_y,
        goal_i=scen.goal_x,
        goal_j=scen.goal_y,
        heuristic_func=None,
        search_tree=search_tree
    )
    print(res)

## Dijkstra

Dijkstra algorithms implementation from lab_02 for baseline solution

In [2]:
class DijkstraNode:
    """
    Represents a search node.

    Attributes
    ----------
    i : int
        Row coordinate of the corresponding grid element.
    j : int
        Column coordinate of the corresponding grid element.
    g : float | int
        g-value of the node.
    h : float | int
        h-value of the node (always 0 for Dijkstra).
    f : float | int
        f-value of the node (always equal to g-value for Dijkstra).
    parent : Node
        Pointer to the parent node.
    """

    def __init__(
        self,
        i: int,
        j: int,
        g: Union[float, int] = 0,
        h: Union[float, int] = 0,
        f: Optional[Union[float, int]] = None,
        parent: "Node" = None,
    ):
        """
        Initializes a search node.

        Parameters
        ----------
        i : int
            Row coordinate of the corresponding grid element.
        j : int
            Column coordinate of the corresponding grid element.
        g : float | int
            g-value of the node.
        h : float | int
            h-value of the node (always 0 for Dijkstra).
        f : float | int
            f-value of the node (always equal to g-value for Dijkstra).
        parent : Node
            Pointer to the parent node.
        """
        self.i = i
        self.j = j
        self.g = g
        self.h = h
        if f is None:
            self.f = self.g + h
        else:
            self.f = f
        self.parent = parent

    def __eq__(self, other):
        """
        Checks if two search nodes are the same, which is needed to detect duplicates in the search tree.
        """
        return (self.i, self.j) == (other.i, other.j)

    def __lt__(self, other):
        """
        Compares the keys (i.e., the f-values) of two nodes, needed for sorting/extracting the best element from OPEN.
        """
        return self.g < other.g

In [5]:
class DijkstraSearchTreePQD:
    """
    SearchTree using a priority queue for OPEN and a dictionary for CLOSED.
    """

    def __init__(self):
        self._open = []  # Priority queue for nodes in OPEN
        self._closed = dict()  # Dictionary for nodes in CLOSED (expanded nodes)
        self._enc_open_duplicates = 0  # Number of duplicates encountered in OPEN

    def __len__(self) -> int:
        """
        Returns the size of the search tree. Useful for assessing the memory
        footprint of the algorithm, especially at the final iteration.
        """
        return len(self._open) + len(self._closed)

    def open_is_empty(self) -> bool:
        """
        Checks if OPEN is empty.
        If true, the main search loop should be interrupted.
        """
        return len(self._open) == 0

    def add_to_open(self, item: DijkstraNode):
        """
        Adds a node to the search tree, specifically to OPEN. This node is either
        entirely new or a duplicate of an existing node in OPEN.
        This implementation detects duplicates lazily; thus, nodes are added to
        OPEN without initial duplicate checks.
        """
        heappush(self._open, item)

    def extract_best_node_from_open(self) -> Optional[DijkstraNode]:
        """
        Retrieves the best node from OPEN, defined by the minimum key.
        This node will then be expanded in the main search loop.

        Duplicates are managed here. If a node has been expanded previously
        (and is in CLOSED), it's skipped and the next best node is considered.

        Returns None if OPEN is empty.
        """
        best_node: Optional[DijkstraNode] = None
        while self.opened and not best_node:
            extracted_node = heappop(self._open)
            if self.was_expanded(extracted_node):
                self._enc_open_duplicates += 1
            else:
                best_node = extracted_node
        return best_node

    def add_to_closed(self, item: DijkstraNode):
        """
        Adds a node to the CLOSED dictionary.
        """
        self._closed[(item.i, item.j)] = item

    def was_expanded(self, item: DijkstraNode) -> bool:
        """
        Checks if a node has been previously expanded.
        """
        return (item.i, item.j) in self._closed
    
    def was_expanded_vertex(self, i: int, j: int):
        return (i, j) in self._closed

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        return self._closed.values()

    @property
    def number_of_open_duplicates(self):
        return self._enc_open_duplicates

In [8]:
def dijkstra(
    task_map: Map,
    start_i: int,
    start_j: int,
    goal_i: int,
    goal_j: int,
    heuristic_func: Optional[Callable],
    search_tree: "DijkstraSearchTreePQD",
) -> Tuple[bool, Optional[DijkstraNode], int, int, Optional[Iterable[DijkstraNode]], Optional[Iterable[DijkstraNode]]]:
    """
    Implementation of Dijkstra algorithm.
    """
    dst = search_tree()  # Dijkstra's search tree
    steps = 0
    start_node = DijkstraNode(i=start_i, j=start_j, g=0, parent=None)
    dst.add_to_open(start_node)

    while dst.opened:
        steps += 1
        curr_node: Optional["DijkstraNode"] = dst.extract_best_node_from_open()
        if not curr_node:  # self._open is not empty, but only has duplicate nodes
            break
        if (curr_node.i, curr_node.j) == (goal_i, goal_j):
            return True, curr_node, steps, len(dst), dst.opened, dst.expanded
        for (i, j) in task_map.get_neighbors(curr_node.i, curr_node.j):
            if dst.was_expanded_vertex(i, j):
                pass
            else:
                new_node = DijkstraNode(
                    i=i,
                    j=j, 
                    g=curr_node.g + compute_cost(curr_node.i, curr_node.j, i, j), 
                    parent=curr_node
                )
                dst.add_to_open(new_node)
        dst.add_to_closed(curr_node)

    return False, None, steps, len(dst), None, dst.expanded