Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [None]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
from typing import Callable
from math import sqrt
from operator import itemgetter
import logging
import heapq
import numpy as np

logging.basicConfig(level=logging.INFO)

In [None]:
PUZZLE_DIM = 3
action = namedtuple('Action', ['pos1', 'pos2'])

In [None]:
def available_actions(state: np.ndarray) -> list['Action']:
    np.asarray(state == 0).nonzero()
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

In [None]:
RANDOMIZE_STEPS = 100_000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))
state

In [None]:
class State:
    def __init__(self, data: np.ndarray):
        self._data = data

    def __hash__(self):
        return hash(bytes(self._data))

    def __eq__(self, other):
        return bytes(self._data) == bytes(other._data)

    def __lt__(self, other):
        return bytes(self._data) < bytes(other._data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return self._data.copy()

In [None]:
init_state = State(state)

GOAL = State(np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))

def goal_test(state: State):
    return state == GOAL

In [None]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    max_cost = None
):
    frontier = []
    parent_state.clear()
    state_cost.clear()

    state = initial_state
    parent_state[state] = None
    state_cost[state] = 0

    while state is not None and not goal_test(state):
        for a in available_actions(state.data):
            new_state = State(do_action(state.data, a))
            # limit diameter
            if max_cost and state_cost[state] + 1 > max_cost:
                continue
            if new_state not in state_cost and new_state not in [s[1] for s in frontier]: #list comprehension slow down a lot the computation
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + 1
                heapq.heappush(frontier, (priority_function(new_state), new_state)) # tuple in heap is necessary for custom priority
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in [s[1] for s in frontier] and state_cost[new_state] > state_cost[state] + 1:
                old_cost = state_cost[new_state]
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + 1
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:
            _, state = heapq.heappop(frontier)
        else:
            state = None

    path = list()
    s = state
    while s:
        path.append(s.copy_data())
        s = parent_state[s]

    logging.info(f"Found a solution with {len(path):,} actions; visited {len(state_cost):,} states")
    return list(reversed(path))


## Breath-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    init_state,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: len(state_cost),
)
final

## Depth-First

In [None]:
parent_state = dict()
state_cost = dict()

final = search(
    init_state,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: -len(state_cost),
    # max_cost=40
)
final

## A*

In [None]:
parent_state = dict()
state_cost = dict()

def h_euclidean(s: np.ndarray) -> int:
    dist = 0
    for (i,j), val in np.ndenumerate(s):
        if(val != 0):
            dx = abs(i - val // PUZZLE_DIM)
            dy = abs(j - val % PUZZLE_DIM)
            dist += sqrt(dx**2 + dy**2)
    return dist

def h_manhattan(s: np.ndarray) -> int:
    dist = 0
    for (i,j), val in np.ndenumerate(s):
        if(val != 0):
            dx = abs(i - val // PUZZLE_DIM)
            dy = abs(j - val % PUZZLE_DIM)
            dist += (dx + dy)
    return dist

def h_tile_not_in_place(s: np.ndarray) -> int:
    dist = np.sum((state.data != GOAL.data) & (state.data > 0))
    return dist

final = search(
    init_state,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: (state_cost[s] + h_euclidean(s.data)),
)

INFO:root:Found a solution with 19 actions; visited 31,136 states
