In [1]:
# Search algorithms
# Implementing https://www.redblobgames.com/pathfinding/a-star/implementation.html
import black
import jupyter_black

jupyter_black.load(lab=True, target_version=black.TargetVersion.PY310)

from typing import TypeVar, Protocol, List, Dict, Tuple, Iterator, Optional

T = TypeVar("T")
Location = TypeVar("Location")


class Graph(Protocol):
    def neighbors(self, id: Location) -> List[Location]:
        pass


class SimpleGraph:
    def __init__(self):
        self.edges: Dict[Location, List[Location]] = {}

    def neighbors(self, id: Location) -> List[Location]:
        return self.edges[id]


class WeightedGraph(Graph):
    def cost(self, from_id: Location, to_id: Location) -> float:
        pass


example_graph = SimpleGraph()
example_graph.edges = {
    "a": "b",
    "b": ["c"],
    "c": ["b", "d", "f"],
    "d": ["c", "e"],
    "e": ["f"],
    "f": [],
}

import collections


class Queue:
    def __init__(self):
        self.elements = collections.deque()

    def empty(self):
        return not self.elements

    def put(self, x: T):
        self.elements.append(x)

    def get(self) -> T:
        return self.elements.popleft()


import heapq


class PriorityQueue:
    def __init__(self):
        self.elements: List[Tuple[float, T]] = []

    def empty(self) -> bool:
        return not self.elements

    def put(self, item: T, priority: float):
        heapq.heappush(self.elements, (priority, item))

    def get(self) -> T:
        return heapq.heappop(self.elements)[1]


GridLocation = Tuple[int, int]


class SquareGrid:
    def __init__(self, width: int, height: int):
        self.width = width
        self.height = height
        self.walls: List[GridLocation] = []

    def in_bounds(self, id: GridLocation) -> bool:
        (x, y) = id
        return 0 <= x < self.width and 0 <= y < self.height

    def passable(self, id: GridLocation) -> bool:
        return id not in self.walls

    def neighbors(self, id: GridLocation) -> Iterator[GridLocation]:
        (x, y) = id
        neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)]
        if (x + y) % 2 == 0:
            neighbors.reverse()
        return [
            node for node in neighbors if self.in_bounds(node) and self.passable(node)
        ]


class GridWithWeights(SquareGrid):
    def __init__(self, width: int, height: int):
        super().__init__(width, height)
        self.weights: Dict[GridLocation, float] = {}

    def cost(self, from_node: GridLocation, to_node: GridLocation) -> float:
        return self.weights.get(to_node, 1)


def from_id_width(id, width):
    return (id % width, id // width)


def draw_tile(graph, id, style):
    r = " . "
    if "number" in style and id in style["number"]:
        r = " %-2d" % style["number"][id]
    if "point_to" in style and style["point_to"].get(id, None) is not None:
        (x1, y1) = id
        (x2, y2) = style["point_to"][id]
        if x2 == x1 + 1:
            r = " → "
        if x2 == x1 - 1:
            r = " ← "
        if y2 == y1 + 1:
            r = " ↓ "
        if y2 == y1 - 1:
            r = " ↑ "
    if "path" in style and id in style["path"]:
        r = " @ "
    if "start" in style and id == style["start"]:
        r = " A "
    if "goal" in style and id == style["goal"]:
        r = " Z "
    if id in graph.walls:
        r = "###"
    return r


def draw_grid(graph, **style):
    print("___" * graph.width)
    for y in range(graph.height):
        for x in range(graph.width):
            print("%s" % draw_tile(graph, (x, y), style), end="")
        print()
    print("~~~" * graph.width)


def reconstruct_path(
    came_from: Dict[Location, Location], start: Location, goal: Location
) -> List[Location]:
    current: Location = goal
    path: List[Location] = []
    while current != start:
        path.append(current)
        current = came_from[current]
    path.append(start)
    path.reverse()
    return path

In [6]:
# fmt: off
DIAGRAM1_WALLS = [from_id_width(id, width=30) for id in [21,22,51,52,81,82,93,94,111,112,123,124,133,134,141,142,153,154,163,164,171,172,173,174,175,183,184,193,194,201,202,203,204,205,213,214,223,224,243,244,253,254,273,274,283,284,303,304,313,314,333,334,343,344,373,374,403,404,433,434]]

diagram4 = GridWithWeights(10, 10)
diagram4.walls = [(1, 7), (1, 8), (2, 7), (2, 8), (3, 7), (3, 8)]
diagram4.weights = {loc: 5 for loc in [(3, 4), (3, 5), (4, 1), (4, 2),
                                       (4, 3), (4, 4), (4, 5), (4, 6),
                                       (4, 7), (4, 8), (5, 1), (5, 2),
                                       (5, 3), (5, 4), (5, 5), (5, 6),
                                       (5, 7), (5, 8), (6, 2), (6, 3),
                                       (6, 4), (6, 5), (6, 6), (6, 7),
                                       (7, 3), (7, 4), (7, 5)]}
# fmt: on

In [7]:
def breadth_first_search(graph: Graph, start: Location, goal: Location):
    frontier = Queue()
    frontier.put(start)
    came_from: Dict[Location, Optional[Location]] = {}
    came_from[start] = None

    while not frontier.empty():
        current: Location = frontier.get()

        if current == goal:
            break

        for next in graph.neighbors(current):
            if next not in came_from:
                frontier.put(next)
                came_from[next] = current

    return came_from


g = SquareGrid(30, 15)
g.walls = DIAGRAM1_WALLS
start = (8, 7)
goal = (17, 2)
parents = breadth_first_search(g, start, goal)
draw_grid(g, point_to=parents, start=start, goal=goal)

__________________________________________________________________________________________
 .  →  →  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ←  .  .  .  . ###### .  .  .  .  .  .  . 
 →  →  →  →  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ←  ←  ←  .  .  . ###### .  .  .  .  .  .  . 
 →  →  →  →  →  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ←  ←  ←  Z  .  .  . ###### .  .  .  .  .  .  . 
 →  →  ↑ ###### ↓  ↓  ↓  ↓  ↓  ↓  ↓  ↓  ←  ←  ←  ←  ←  ←  .  . ###### .  .  .  .  .  .  . 
 .  ↑  ↑ ###### ↓  ↓  ↓  ↓  ↓  ↓  ↓  ← ###### ↑  ←  ←  .  .  . ###### .  .  .  .  .  .  . 
 .  .  ↑ ###### →  ↓  ↓  ↓  ↓  ↓  ←  ← ###### ↑  ↑  .  .  .  . ############### .  .  .  . 
 .  .  . ###### →  →  ↓  ↓  ↓  ←  ←  ← ###### ↑  .  .  .  .  . ############### .  .  .  . 
 .  .  . ###### →  →  →  A  ←  ←  ←  ← ###### .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 
 .  .  . ###### →  →  ↑  ↑  ↑  ←  ←  ← ###### .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 
 .  .  ↓ ###### →  ↑  ↑  ↑  ↑  ↑  ←  ← ###### .  .  .  .  .  .  .  .  .  .  .  .  .  .  . 

In [4]:
def dijkstra_search(graph: WeightedGraph, start: Location, goal: Location):
    frontier = PriorityQueue()
    frontier.put(start, 0)
    came_from: Dict[Location, Optional[Location]] = {}
    cost_so_far: Dict[Location, float] = {}
    came_from[start] = None
    cost_so_far[start] = 0

    while not frontier.empty():
        current: Location = frontier.get()

        if current == goal:
            break

        for next in graph.neighbors(current):
            new_cost = cost_so_far[current] + graph.cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost
                frontier.put(next, priority)
                came_from[next] = current

    return came_from, cost_so_far


start, goal = (1, 4), (8, 3)
came_from, cost_so_far = dijkstra_search(diagram4, start, goal)
draw_grid(diagram4, point_to=came_from, start=start, goal=goal)
draw_grid(diagram4, path=reconstruct_path(came_from, start=start, goal=goal))

______________________________
 ↓  ↓  ←  ←  ←  ←  ←  ←  ←  ← 
 ↓  ↓  ←  ←  ←  ↑  ↑  ←  ←  ← 
 ↓  ↓  ←  ←  ←  ←  ↑  ↑  ←  ← 
 ↓  ↓  ←  ←  ←  ←  ←  ↑  Z  . 
 →  A  ←  ←  ←  ←  .  .  .  . 
 ↑  ↑  ←  ←  ←  ←  .  .  .  . 
 ↑  ↑  ←  ←  ←  ←  ←  .  .  . 
 ↑ ######### ↑  ←  ↓  ↓  .  . 
 ↑ ######### ↓  ↓  ↓  ←  ←  . 
 ↑  ←  ←  ←  ←  ←  ←  ←  ←  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
______________________________
 .  @  @  @  @  @  @  .  .  . 
 .  @  .  .  .  .  @  @  .  . 
 .  @  .  .  .  .  .  @  @  . 
 .  @  .  .  .  .  .  .  @  . 
 .  @  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


In [140]:
draw_grid(diagram4, number=cost_so_far, start=start, goal=goal)

______________________________
 5  4  5  6  7  8  9  10 11 12
 4  3  4  5  10 13 10 11 12 13
 3  2  3  4  9  14 15 12 13 14
 2  1  2  3  8  13 18 17 Z  . 
 1  A  1  6  11 16 .  .  .  . 
 2  1  2  7  12 17 .  .  .  . 
 3  2  3  4  9  14 19 .  .  . 
 4 ######### 14 19 18 15 .  . 
 5 ######### 15 16 13 14 15 . 
 6  7  8  9  10 11 12 13 14 . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


In [142]:
def heuristic(a: GridLocation, b: GridLocation) -> float:
    (x1, y1) = a
    (x2, y2) = b
    return abs(x1 - x2) + abs(y1 - y2)


def a_star_search(graph: WeightedGraph, start: Location, goal: Location):
    frontier = PriorityQueue()
    frontier.put(start, 0)
    came_from: Dict[Location, Optional[Location]] = {}
    cost_so_far: Dict[Location, float] = {}
    came_from[start] = None
    cost_so_far[start] = 0

    while not frontier.empty():
        current: Location = frontier.get()

        if current == goal:
            break

        for next in graph.neighbors(current):
            new_cost = cost_so_far[current] + graph.cost(current, next)
            if next not in cost_so_far or new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost + heuristic(next, goal)
                frontier.put(next, priority)
                came_from[next] = current

    return came_from, cost_so_far


start, goal = (1, 4), (8, 3)
came_from, cost_so_far = a_star_search(diagram4, start, goal)
draw_grid(diagram4, point_to=came_from, start=start, goal=goal)
draw_grid(diagram4, path=reconstruct_path(came_from, start=start, goal=goal))

______________________________
 ↓  ↓  ↓  ↓  ←  ←  ←  ←  ←  ← 
 ↓  ↓  ↓  ↓  ←  ↑  ↑  ←  ←  ← 
 ↓  ↓  ↓  ↓  ←  ←  ↑  ↑  ←  ← 
 →  ↓  ←  ←  ←  ←  .  ↑  Z  . 
 →  A  ←  ←  ←  .  .  .  .  . 
 ↑  ↑  ↑  ←  ←  .  .  .  .  . 
 ↑  ↑  ↑  ←  ←  .  .  .  .  . 
 ↑ ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
______________________________
 .  .  .  @  @  @  @  .  .  . 
 .  .  .  @  .  .  @  @  .  . 
 .  .  .  @  .  .  .  @  @  . 
 .  @  @  @  .  .  .  .  @  . 
 .  @  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


In [143]:
draw_grid(diagram4, number=cost_so_far, start=start, goal=goal)

______________________________
 5  4  5  6  7  8  9  10 11 12
 4  3  4  5  10 13 10 11 12 13
 3  2  3  4  9  14 15 12 13 14
 2  1  2  3  8  13 .  17 Z  . 
 1  A  1  6  11 .  .  .  .  . 
 2  1  2  7  12 .  .  .  .  . 
 3  2  3  4  9  .  .  .  .  . 
 4 ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


In [172]:
from math import inf
from collections import defaultdict
from heapq import heappop, heappush


def heuristic(a: GridLocation, b: GridLocation) -> float:
    (x1, y1) = a
    (x2, y2) = b
    return abs(x1 - x2) + abs(y1 - y2)


def a_star_alt(graph: WeightedGraph, start: Location, goal: Location):
    # A star with defaultdict and native heapq instead of PriorityQueue
    frontier = []
    heappush(frontier, (0, start))
    came_from: Dict[Location, Optional[Location]] = {}
    cost_so_far: Dict[Location, float] = defaultdict(lambda: inf)
    came_from[start] = None
    cost_so_far[start] = 0

    while frontier:
        current: Location = heappop(frontier)[1]

        if current == goal:
            break

        for next in graph.neighbors(current):
            new_cost = cost_so_far[current] + graph.cost(current, next)
            if new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost + heuristic(next, goal)
                heappush(frontier, (priority, next))
                came_from[next] = current

    return came_from, cost_so_far


start, goal = (1, 4), (8, 3)
came_from, cost_so_far = a_star_alt(diagram4, start, goal)
draw_grid(diagram4, point_to=came_from, start=start, goal=goal)
draw_grid(diagram4, path=reconstruct_path(came_from, start=start, goal=goal))

______________________________
 ↓  ↓  ↓  ↓  ←  ←  ←  ←  ←  ← 
 ↓  ↓  ↓  ↓  ←  ↑  ↑  ←  ←  ← 
 ↓  ↓  ↓  ↓  ←  ←  ↑  ↑  ←  ← 
 →  ↓  ←  ←  ←  ←  .  ↑  Z  . 
 →  A  ←  ←  ←  .  .  .  .  . 
 ↑  ↑  ↑  ←  ←  .  .  .  .  . 
 ↑  ↑  ↑  ←  ←  .  .  .  .  . 
 ↑ ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
______________________________
 .  .  .  @  @  @  @  .  .  . 
 .  .  .  @  .  .  @  @  .  . 
 .  .  .  @  .  .  .  @  @  . 
 .  @  @  @  .  .  .  .  @  . 
 .  @  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 . ######### .  .  .  .  .  . 
 .  .  .  .  .  .  .  .  .  . 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


In [3]:
from math import inf
from collections import defaultdict
from heapq import heappop, heappush

def a_star(start, goal):
    """
    A* search from state `start` to `goal`.
    Need to provide functions: `heuristic`, `cost`, and `moves`
    """
    frontier = []
    heappush(frontier, (0, start))
    came_from = {}
    cost_so_far = defaultdict(lambda: inf)
    came_from[start] = None
    cost_so_far[start] = 0

    while frontier:
        current = heappop(frontier)[1]
        if heuristic(current, goal) == 0:
            break
        for next in moves(current):
            new_cost = cost_so_far[current] + cost(current, next)
            if new_cost < cost_so_far[next]:
                cost_so_far[next] = new_cost
                priority = new_cost + heuristic(next, goal)
                heappush(frontier, (priority, next))
                came_from[next] = current

    return came_from, cost_so_far, current


# These needs to be implemented for each individual problem
def heuristic(current, goal) -> float:
    "Minimum estimation of cost to get from a to goal. Also used to determine when we have reached goal (heuristic returns 0)."
    # This implements cab distance from a to goal. State is (x,y) for a point.
    (x1, y1) = current
    (x2, y2) = goal
    return abs(x1 - x2) + abs(y1 - y2)


def cost(current, next):
    return 1


def moves(current):
    pass

