In [1]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
from random import shuffle

  from .autonotebook import tqdm as notebook_tqdm


### Utility Functions
- avaiable_actions: retrive possible actions from a specific state
- do_action: executes de action
- is_solvable_classic: check if a puzzle is solvable
- generate_solvable_puzzle_classic: generate a solvable puzzle (not used to generate the puzzle in this exercise)

In [2]:
# Definizione di `action`
PUZZLE_DIM = 4
action = namedtuple('Action', ['pos1', 'pos2'])

# Genera tutte le mosse valide per lo stato corrente
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = []
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))  # Sposta in alto
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))  # Sposta in basso
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))  # Sposta a sinistra
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))  # Sposta a destra
    return actions

# Esegui una mossa e restituisci il nuovo stato
def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()  # Evita modifiche in-place
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

# Verifica se un puzzle è risolvibile
def is_solvable_classic(puzzle: np.ndarray) -> bool:
    # Trova la posizione dello 0
    zero_pos = np.where(puzzle == 0)
    zero_row, _ = zero_pos[0][0], zero_pos[1][0]  # Ottieni la riga dello 0 (base 0)

    # Appiattisci il puzzle escludendo lo zero
    flat = puzzle.flatten()
    flat = flat[flat != 0]  # Rimuovi lo zero

    # Conta il numero di inversioni
    inversions = 0
    for i in range(len(flat)):
        for j in range(i + 1, len(flat)):
            if flat[i] > flat[j]:
                inversions += 1

    # Verifica la risolvibilità
    n = puzzle.shape[0]  # Dimensione del puzzle (es: 4 per 4x4)
    if n % 2 != 0:  # Dimensioni dispari
        # Per puzzle dispari, il numero di inversioni deve essere pari
        return inversions % 2 == 0
    else:  # Dimensioni pari
        # Per puzzle pari, verifica (inversioni + riga dello 0) % 2 == 0
        # Usa zero_row + 1 perché la riga deve essere 1-indicizzata
        return (inversions + (zero_row + 1)) % 2 == 0

# Genera un puzzle risolvibile
def generate_solvable_puzzle_classic():
    while True:
        # Genera un puzzle casuale
        puzzle = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0])
        shuffle(puzzle)
        puzzle = puzzle.reshape(PUZZLE_DIM, PUZZLE_DIM)

        # Verifica se è risolvibile
        if is_solvable_classic(puzzle):
            return puzzle


### Heuristic Function
The Linear Conflict Heuristic enhances the Manhattan Distance by adding a penalty for tiles in the same row or column that block each other from reaching their goal positions.

##### Manhattan Distance:
Calculates the sum of the vertical and horizontal distances between each tile and its target position.

##### Linear Conflict:
1) Identifies tiles in row conflicts:
Two tiles in the same row are in conflict if they need to cross each other to reach their goal positions.
2) Identifies tiles in column conflicts:
Two tiles in the same column are in conflict if they need to cross each other to reach their goal positions.
For every conflict found, adds a penalty of 2 (since at least two moves are required to resolve the conflict).

##### Final Result:

Returns the sum of the Manhattan Distance and twice the number of linear conflicts.
This makes the heuristic more informed than using Manhattan Distance alone, as it better estimates the cost to solve the puzzle

In [3]:
def heuristic_linear_conflict(state: np.ndarray) -> int:
    PUZZLE_DIM = state.shape[0]
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape(PUZZLE_DIM, PUZZLE_DIM)
    
    # Manhattan distance
    manhattan_distance = 0
    linear_conflict = 0

    for i in range(PUZZLE_DIM):
        for j in range(PUZZLE_DIM):
            value = state[i, j]
            if value != 0:  # Ignora lo spazio vuoto
                goal_i, goal_j = divmod(value - 1, PUZZLE_DIM)
                manhattan_distance += abs(i - goal_i) + abs(j - goal_j)

                # Conflitti lineari nella riga
                if i == goal_i:
                    for k in range(j + 1, PUZZLE_DIM):
                        other_value = state[i, k]
                        if other_value != 0:
                            other_goal_i, other_goal_j = divmod(other_value - 1, PUZZLE_DIM)
                            if other_goal_i == i and other_goal_j < goal_j:
                                linear_conflict += 1

                # Conflitti lineari nella colonna
                if j == goal_j:
                    for k in range(i + 1, PUZZLE_DIM):
                        other_value = state[k, j]
                        if other_value != 0:
                            other_goal_i, other_goal_j = divmod(other_value - 1, PUZZLE_DIM)
                            if other_goal_j == j and other_goal_i < goal_i:
                                linear_conflict += 1

    return manhattan_distance + 2 * linear_conflict

### Path Finding Algorithm
The bidirectional_a_star function performs a bidirectional search using the A* algorithm, with two search fronts:

Forward search: Starts from the initial state and moves towards the goal state.
Backward search: Starts from the goal state and moves towards the initial state.
The search terminates when the two fronts meet at a common state (intersection).



In [4]:
from heapq import heappush, heappop
def bidirectional_a_star(start_state: np.ndarray):
    PUZZLE_DIM = start_state.shape[0]
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape(PUZZLE_DIM, PUZZLE_DIM)

    # Coda per la ricerca in avanti
    forward_queue = []
    heappush(forward_queue, (0 + heuristic_linear_conflict(start_state), 0, start_state.tobytes(), [start_state]))
    forward_visited = {start_state.tobytes(): (0, [start_state])}  # Stato -> (costo g, percorso)

    # Coda per la ricerca all'indietro
    backward_queue = []
    heappush(backward_queue, (0 + heuristic_linear_conflict(goal_state), 0, goal_state.tobytes(), [goal_state]))
    backward_visited = {goal_state.tobytes(): (0, [goal_state])}  # Stato -> (costo g, percorso)

    cont = 0  # Contatore degli stati visitati

    while forward_queue and backward_queue:
        # Espandi dalla ricerca in avanti
        f_fwd, g_fwd, current_fwd_bytes, path_fwd = heappop(forward_queue)
        current_fwd = np.frombuffer(current_fwd_bytes, dtype=start_state.dtype).reshape(start_state.shape)

        # Controlla intersezione con la ricerca all'indietro
        if current_fwd_bytes in backward_visited:
            print(f"Intersection found after {cont} been visited!")
            print(f"Cost: {cont} node evaluated")
            backward_cost, backward_path = backward_visited[current_fwd_bytes]
            return path_fwd + list(reversed(backward_path))

        # Genera successori per la ricerca in avanti
        for move in available_actions(current_fwd):
            next_state = do_action(current_fwd, move)
            next_state_bytes = next_state.tobytes()
            next_g = g_fwd + 1
            if next_state_bytes not in forward_visited or forward_visited[next_state_bytes][0] > next_g:
                forward_visited[next_state_bytes] = (next_g, path_fwd + [next_state])
                heappush(forward_queue, (next_g + heuristic_linear_conflict(next_state), next_g, next_state_bytes, path_fwd + [next_state]))

        # Espandi dalla ricerca all'indietro
        f_bwd, g_bwd, current_bwd_bytes, path_bwd = heappop(backward_queue)
        current_bwd = np.frombuffer(current_bwd_bytes, dtype=start_state.dtype).reshape(start_state.shape)

        # Controlla intersezione con la ricerca in avanti
        if current_bwd_bytes in forward_visited:
            print(f"Intersection found after {cont} been visited!")
            print(f"Cost: {cont} node evaluated")
            forward_cost, forward_path = forward_visited[current_bwd_bytes]
            return forward_path + list(reversed(path_bwd))

        # Genera successori per la ricerca all'indietro
        for move in available_actions(current_bwd):
            next_state = do_action(current_bwd, move)
            next_state_bytes = next_state.tobytes()
            next_g = g_bwd + 1
            if next_state_bytes not in backward_visited or backward_visited[next_state_bytes][0] > next_g:
                backward_visited[next_state_bytes] = (next_g, path_bwd + [next_state])
                heappush(backward_queue, (next_g + heuristic_linear_conflict(next_state), next_g, next_state_bytes, path_bwd + [next_state]))

        cont += 1

    print(f"Unsolvable puzzle. States visited: {cont}")
    return None


### Execute!

Create an initial state with random permutations of the solved puzzle, apply the algorithm to solve it, and print the moves executed along with the intermediate states.

In [None]:
# Genera uno stato iniziale
RANDOMIZE_STEPS = 5000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
print("Initial puzzle:")
print(state)

# Randomizza il puzzle
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))

print("Puzzle randomized:")
print(state)
print("Is solvable?", is_solvable_classic(state))
# Verifica risolvibilità
if not is_solvable_classic(state):
    print("The generated puzzle is not solvable.")
else:
    print("The puzzle is solvable. Solving with bidirectional search...")

    # Risolvi il puzzle con ricerca bidirezionale
    solution_states = bidirectional_a_star(state)

    if solution_states is not None:
        print("Solution found")
        print(f"Puzzle solved in {len(solution_states) - 1} moves!")
        print(f"Quality: {len(solution_states) - 1} moves!")
        print("Solution path:")
        # Stampa tutti gli stati nel percorso della soluzione
        for step, state in enumerate(solution_states):
            print(f"Step {step}:")
            print(state)
            print("-" * 20)
    else:
        print("Puzzle unsolvable.")

Initial puzzle:
[[ 1  2  3  4]
 [ 5  6  7  8]
 [ 9 10 11 12]
 [13 14 15  0]]


Randomizing:   0%|          | 0/5000 [00:00<?, ?it/s]

Randomizing: 100%|██████████| 5000/5000 [00:00<00:00, 54620.02it/s]

Puzzle randomized:
[[15  6  8  9]
 [ 4  7 13  1]
 [11 10  0 14]
 [12  3  5  2]]
Is solvable? True
The puzzle is solvable. Solving with bidirectional search...





Intersection found after 384232 been visited!
Cost: 384232 node evaluated
Solution found
Puzzle solved in 73 moves!
Quality: 73 moves!
Solution path:
Step 0:
[[15  6  8  9]
 [ 4  7 13  1]
 [11 10  0 14]
 [12  3  5  2]]
--------------------
Step 1:
[[15  6  8  9]
 [ 4  7  0  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 2:
[[15  6  8  9]
 [ 4  0  7  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 3:
[[15  6  8  9]
 [ 0  4  7  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 4:
[[ 0  6  8  9]
 [15  4  7  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 5:
[[ 6  0  8  9]
 [15  4  7  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 6:
[[ 6  4  8  9]
 [15  0  7  1]
 [11 10 13 14]
 [12  3  5  2]]
--------------------
Step 7:
[[ 6  4  8  9]
 [15 10  7  1]
 [11  0 13 14]
 [12  3  5  2]]
--------------------
Step 8:
[[ 6  4  8  9]
 [15 10  7  1]
 [11  3 13 14]
 [12  0  5  2]]
--------------------
Step 9:
[[ 6  4  8  9]
 [15 10  7  1]
 [