### Imports

In [1]:
from typing import Any, Callable
from abc import ABC, abstractmethod
from functools import partial

### Interface

In [2]:
class AbstractHeap(ABC):

    @abstractmethod
    def add(self, element: Any, key: int) -> None:
        pass

    @abstractmethod
    def decrease_key(self, element: Any, new_key: int) -> None:
        pass

    @abstractmethod
    def delete_minimum(self) -> Any:
        pass

    @abstractmethod
    def __bool__(self) -> bool:
        pass

### Heap Implementation

In [3]:
class ArrayHeap(AbstractHeap):

    __slots__ = ['data']

    def __init__(self):
        self.data = []

    def add(self, element: Any, key: int):
        for k, e in self.data:
            if e == element:
                raise ValueError(f'duplicate element: {element}')
        self.data.append((key, element))
        self.data.sort()

    def decrease_key(self, element: Any, new_key: int):
        index = None
        for i, e in enumerate(self.data):
            if e[1] == element:
                index = i
        if index is None:
            raise ValueError(f'no such element: {element}')
        old_key, _ = self.data[index]
        if old_key < new_key:
            raise ValueError(f'cannot increase key for {element} from {old_key} to {new_key}')
        del self.data[index]
        self.add(element, new_key)

    def delete_minimum(self):
        if not self:
            raise ValueError(f'no such element')
        _, rv = self.data[0]
        del self.data[0]
        return rv

    def __bool__(self):
        return bool(self.data)

    def __str__(self):
        return '\n'.join([f'{key}: {element}' for key, element in self.data])

    def __repr__(self):
        return str(self)

### Dijkstra Search

In [4]:
def search(heap_factory: Callable[[], AbstractHeap],
           source_vertex: Any,
           neighbourhood_function: Callable[[Any], list[tuple[Any, int]]],
           is_target: Callable[[Any], bool],
           heuristic: Callable[[Any], int] = lambda _: 0) -> tuple[list[Any], set[Any]]:

    previous_vertices = {} # to reconstruct the path

    frontier = heap_factory()
    frontier.add(source_vertex, 0)
    distances = {source_vertex: 0}

    while frontier:

        current = frontier.delete_minimum()
        old_total_distance = distances[current]

        # if done, path from source to target
        if is_target(current):

            rv = [current]
            while rv[0] != source_vertex:
                p = previous_vertices[rv[0]]
                rv = [p] + rv
            return rv, set(distances)

        for neighbour, cost in neighbourhood_function(current):

            new_total_distance = old_total_distance + cost
            known_distance = None
            if neighbour in distances:
                known_distance = distances[neighbour]

            # newly discovered node
            if not known_distance:
                distances[neighbour] = new_total_distance
                frontier.add(neighbour, new_total_distance)
                previous_vertices[neighbour] = current

            # previously discovered node
            elif new_total_distance < known_distance:
                distances[neighbour] = new_total_distance
                frontier.decrease_key(neighbour, new_total_distance)
                previous_vertices[neighbour] = current

    raise ValueError(f'No path found!!!')

### Test Code

In [5]:
import numpy as np

HEIGHT, WIDTH = 5, 10
ARR = np.zeros((HEIGHT, WIDTH), dtype=int)
ARR[2, 0] = 1
ARR[2, 1] = 1
ARR[1, 1] = 1
ARR[0, 7] = 1
ARR[1, 7] = 1
ARR[1, 6] = 1
ARR[2, 4] = 1
ARR[2, 5] = 1
ARR[2, 3] = 1
ARR[3, 7] = 1
ARR[4, 7] = 1

def neighbours(vertex):
    global HEIGHT, WIDTH, ARR
    y, x = vertex
    assert 0 <= y < HEIGHT and 0 <= x < WIDTH
    y -= 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    y += 1
    x += 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    x -= 1
    y += 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    y -= 1
    x -= 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1

path, visited = search(ArrayHeap, (0, 0), neighbours, lambda x: x == (1, 8))
for point in visited:
    ARR[point] = 3
for point in path:
    ARR[point] = 2

for y in range(HEIGHT):
    for x in range(WIDTH):
        if ARR[y, x] == 0:
            print('⬛', end='')
        elif ARR[y, x] == 1:
            print('⬜', end='')
        elif ARR[y, x] == 2:
            print('🟩', end='')
        elif ARR[y, x] == 3:
            print('🟨', end='')
        else:
            raise ValueError(ARR[y, x])
    print()#%% md
### Imports

🟩🟩🟩🟨🟨🟨🟨⬜⬛⬛
🟨⬜🟩🟨🟨🟨⬜⬜🟩⬛
⬜⬜🟩⬜⬜⬜🟩🟩🟩🟨
🟨🟨🟩🟩🟩🟩🟩⬜🟨⬛
🟨🟨🟨🟨🟨🟨🟨⬜⬛⬛


In [6]:
from typing import Any, Callable
from abc import ABC, abstractmethod

### Interface

In [7]:
class AbstractHeap(ABC):

    @abstractmethod
    def add(self, element: Any, key: int) -> None:
        pass

    @abstractmethod
    def decrease_key(self, element: Any, new_key: int) -> None:
        pass

    @abstractmethod
    def delete_minimum(self) -> Any:
        pass

    @abstractmethod
    def __bool__(self) -> bool:
        pass

### Heap Implementation

In [8]:
class ArrayHeap(AbstractHeap):

    __slots__ = ['data']

    def __init__(self):
        self.data = []

    def add(self, element: Any, key: int):
        for k, e in self.data:
            if e == element:
                raise ValueError(f'duplicate element: {element}')
        self.data.append((key, element))
        self.data.sort()

    def decrease_key(self, element: Any, new_key: int):
        index = None
        for i, e in enumerate(self.data):
            if e[1] == element:
                index = i
        if index is None:
            raise ValueError(f'no such element: {element}')
        old_key, _ = self.data[index]
        if old_key < new_key:
            raise ValueError(f'cannot increase key for {element} from {old_key} to {new_key}')
        del self.data[index]
        self.add(element, new_key)

    def delete_minimum(self):
        if not self:
            raise ValueError(f'no such element')
        _, rv = self.data[0]
        del self.data[0]
        return rv

    def __bool__(self):
        return bool(self.data)

    def __str__(self):
        return '\n'.join([f'{key}: {element}' for key, element in self.data])

    def __repr__(self):
        return str(self)

### Dijkstra Search

In [9]:
def dijkstra(heap_factory: Callable[[], AbstractHeap],
             source_vertex: Any,
             neighbourhood_function: Callable[[Any], list[tuple[Any, int]]],
             is_target: Callable[[Any], bool]) -> tuple[list[Any], set[Any]]:
    return a_star(heap_factory, source_vertex, neighbourhood_function, is_target, heuristic=lambda _: 0)

In [10]:
def a_star(heap_factory: Callable[[], AbstractHeap],
           source_vertex: Any,
           neighbourhood_function: Callable[[Any], list[tuple[Any, int]]],
           is_target: Callable[[Any], bool],
           heuristic: Callable[[Any], int]) -> tuple[list[Any], set[Any]]:

    previous_vertices = {} # to reconstruct the path

    g_scores = {source_vertex: 0}
    f_scores = {source_vertex: heuristic(source_vertex)}

    frontier = heap_factory()
    frontier.add(source_vertex, f_scores)

    visited = set()

    while frontier:

        current = frontier.delete_minimum()
        visited.add(current)
        current_g_score = g_scores[current]

        # if done, path from source to target
        if is_target(current):

            rv = [current]
            while rv[0] != source_vertex:
                p = previous_vertices[rv[0]]
                rv = [p] + rv
            return rv, visited

        for neighbour, cost in neighbourhood_function(current):

            new_g_score = current_g_score + cost

            known_g_score = None
            if neighbour in g_scores:
                known_g_score = g_scores[neighbour]

            # newly discovered node
            if not known_g_score:
                g_scores[neighbour] = new_g_score
                neighbour_f_score = new_g_score + heuristic(neighbour)
                f_scores[neighbour] = neighbour_f_score
                frontier.add(neighbour, neighbour_f_score)
                previous_vertices[neighbour] = current

            # previously discovered node
            elif new_g_score < known_g_score:
                g_scores[neighbour] = new_g_score
                neighbour_f_score = new_g_score + heuristic(neighbour)
                f_scores[neighbour] = neighbour_f_score
                frontier.decrease_key(neighbour, neighbour_f_score)
                previous_vertices[neighbour] = current

    raise ValueError(f'No path found!!!')

### Test Code

In [11]:
import numpy as np

HEIGHT, WIDTH = 5, 10
ARR = np.zeros((HEIGHT, WIDTH), dtype=int)
ARR[2, 0] = 1
ARR[2, 1] = 1
ARR[1, 1] = 1
ARR[0, 7] = 1
ARR[1, 7] = 1
ARR[1, 6] = 1
ARR[2, 4] = 1
ARR[2, 5] = 1
ARR[2, 3] = 1
ARR[3, 7] = 1
ARR[4, 7] = 1

def neighbours(vertex):
    global HEIGHT, WIDTH, ARR
    y, x = vertex
    assert 0 <= y < HEIGHT and 0 <= x < WIDTH
    y -= 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    y += 1
    x += 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    x -= 1
    y += 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1
    y -= 1
    x -= 1
    if 0 <= y < HEIGHT and 0 <= x < WIDTH and ARR[y, x] != 1:
        yield (y, x), 1

def manhattan(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])

TARGET = (1, 8)
heuristic = partial(manhattan, TARGET)

path, visited = dijkstra(ArrayHeap, (0, 0), neighbours, lambda x: x == TARGET)
for point in visited:
    ARR[point] = 3
for point in path:
    ARR[point] = 2

for y in range(HEIGHT):
    for x in range(WIDTH):
        if ARR[y, x] == 0:
            print('⬛', end='')
        elif ARR[y, x] == 1:
            print('⬜', end='')
        elif ARR[y, x] == 2:
            print('🟩', end='')
        elif ARR[y, x] == 3:
            print('🟨', end='')
        else:
            raise ValueError(ARR[y, x])
    print()
print(len(path) - 1)
print(len(visited))

# Dijkstra h(x) = 0
# 🟩🟩🟩🟨🟨🟨🟨⬜⬛⬛
# 🟨⬜🟩🟨🟨🟨⬜⬜🟩⬛
# ⬜⬜🟩⬜⬜⬜🟩🟩🟩⬛
# 🟨🟨🟩🟩🟩🟩🟩⬜⬛⬛
# 🟨🟨🟨🟨🟨🟨🟨⬜⬛⬛
# cost = len(path)-1 = 13 edge units
# visited = len(visited) = 31 vertices

# A* h(x) = mahattan
# 🟩🟩🟩🟨🟨🟨🟨⬜⬛⬛
# 🟨⬜🟩🟨🟨🟨⬜⬜🟩⬛
# ⬜⬜🟩⬜⬜⬜🟩🟩🟩⬛
# ⬛⬛🟩🟩🟩🟩🟩⬜⬛⬛
# ⬛⬛⬛⬛⬛⬛⬛⬜⬛⬛
# cost = len(path)-1 = 13
# visited = len(visited) = 22

🟩🟩🟩🟨🟨🟨🟨⬜⬛⬛
🟨⬜🟩🟨🟨🟨⬜⬜🟩⬛
⬜⬜🟩⬜⬜⬜🟩🟩🟩⬛
🟨🟨🟩🟩🟩🟩🟩⬜⬛⬛
🟨🟨🟨🟨🟨🟨🟨⬜⬛⬛
13
31
