Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# n-puzzle problem


In [56]:
RANDOM_SEED = 1242
SIZE = 3

In [57]:
import logging
from random import seed, choice
from typing import Callable
import numpy as np
from queue import PriorityQueue
logging.basicConfig(format="%(message)s", level=logging.INFO)

In [58]:
class State:
    def __init__(self, data: np.ndarray):
        self._data = data.copy()
        self._data.flags.writeable = False

    def __hash__(self):
        return hash(bytes(self._data))

    def __eq__(self, other):
        return bytes(self._data) == bytes(other._data)

    def __lt__(self, other):
        return bytes(self._data) < bytes(other._data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return self._data.copy()

## Search Algorithm (global search)

In [59]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    unit_cost: Callable,
):
    frontier = PriorityQueue() 
    parent_state.clear()      
    state_cost.clear()          

    state = initial_state       # start from the initial state
    parent_state[state] = None  # the initial state has no parent
    state_cost[state] = 0       # cost to reach the initial state is zero

    while state is not None and not goal_test(state):  # process states until goal is reached or no states remain
        for a in possible_actions(state):              # iterate over all possible actions from the current state
            new_state = result(state, a)               # determine the resulting state after applying the action
            cost = unit_cost(a)                        # compute the cost of this action
            if new_state not in state_cost and new_state not in frontier.queue:  
                parent_state[new_state] = state  # if the state is new: set the current state as its parent, calculate the cumulative cost and add it to the frontier with its priority                                
                state_cost[new_state] = state_cost[state] + cost 
                frontier.put(new_state, priority_function(new_state))  
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in frontier.queue and state_cost[new_state] > state_cost[state] + cost:  
                old_cost = state_cost[new_state] # if the state is already in the frontier with a higher cost: store the old cost for logging, update its parent to the current state and update its cumulative cost
                parent_state[new_state] = state 
                state_cost[new_state] = state_cost[state] + cost  
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:  # if there are more states to process in the frontier get the state with the highest priority
            state = frontier.get()  
        else:
            state = None

    path = list()  # initialize the solution path
    s = state
    while s:                        # reconstruct the path from the goal state to the initial state
        path.append(s.copy_data())  # copy state data to avoid modifying the original state
        s = parent_state[s]         # move to the parent state

    logging.info(f"Found a solution in {len(path):,} steps; visited {len(state_cost):,} states") 
    return list(reversed(path))  # return the path in the correct order, from initial to goal state


## Graph search for the the n-puzzle

In [60]:
seed(RANDOM_SEED)

GOAL = State(np.array(list(range(1, SIZE**2)) + [0]).reshape((SIZE, SIZE)))
logging.info(f"Goal:\n{GOAL}")

def goal_test(state):
    return state == GOAL

# (R, C) -> UP / RIGHT / DOWN / LEFT
MOVES = [np.array(_) for _ in [(-1, 0), (0, +1), (+1, 0), (0, -1)]]

def find_empty_space(board: np.ndarray):
    t = np.where(board == 0)
    return np.array([t[0][0], t[1][0]])

def is_valid(board: np.ndarray, action):
    return all(0 <= (find_empty_space(board) + action)[i] < board.shape[i] for i in [0, 1])

def possible_actions(state: State):
    return (m for m in MOVES if is_valid(state._data, m))

def result(state, action):
    board = state.copy_data()
    space = find_empty_space(board)
    pos = space + action
    board[space[0], space[1]] = board[pos[0], pos[1]]
    board[pos[0], pos[1]] = 0
    return State(board)

INITIAL_STATE = GOAL
for r in range(5_000):
    INITIAL_STATE = result(INITIAL_STATE, choice(list(possible_actions(INITIAL_STATE))))

print("\nInitial State:")
print(INITIAL_STATE)

Goal:
[[1 2 3]
 [4 5 6]
 [7 8 0]]



Initial State:
[[1 8 4]
 [6 5 2]
 [7 3 0]]


## A*

In [None]:
parent_state = dict()  # dictionary to store the parent of each state
state_cost = dict()    # dictionary to store the cumulative cost for each state

def h(state): # heuristic function: number of misplaced tiles (ignoring empty tiles)
    return np.sum((state._data != GOAL._data) & (state._data > 0))

final = search(
    INITIAL_STATE,                                     # starting state of the search
    goal_test=goal_test,                               # function to test if the goal state is reached
    parent_state=parent_state,                         # pass the parent_state dictionary for tracking
    state_cost=state_cost,                             # pass the state_cost dictionary for tracking
    priority_function=lambda s: state_cost[s] + h(s),  # combine the cost-so-far and the heuristic value
    unit_cost=lambda a: 1,                             # define a uniform unit cost for all actions
)

Found a solution in 143 steps; visited 6,654 states


## Breadth-First

In [None]:
parent_state = dict()  # dictionary to store the parent of each state
state_cost = dict()    # dictionary to store the cumulative cost for each state

final = search(
    INITIAL_STATE,                                # starting state of the search
    goal_test=goal_test,                          # function to test if the goal state is reached
    parent_state=parent_state,                    # pass the parent_state dictionary for tracking
    state_cost=state_cost,                        # pass the state_cost dictionary for tracking
    priority_function=lambda s: len(state_cost),  # use a constant priority for breadth-first (queue behavior)
    unit_cost=lambda a: 1,                        # define a uniform unit cost for all actions
)

Found a solution in 143 steps; visited 6,654 states
