Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [None]:
from collections import namedtuple,deque
from heapq import heappush, heappop
from random import choice
from tqdm.auto import tqdm
import numpy as np

In [None]:
PUZZLE_DIM = 6
action = namedtuple('Action', ['pos1', 'pos2'])

In [None]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(action((x, y), (x, y + 1)))
    return actions



def do_action(state: np.ndarray, action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[action.pos1], new_state[action.pos2] = new_state[action.pos2], new_state[action.pos1]
    return new_state

In [None]:
RANDOMIZE_STEPS = 100_000
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
    state = do_action(state, choice(available_actions(state)))
state

In [None]:
#Trying to solve the puzzle 
def is_solved(state: np.ndarray) -> bool:
    return np.array_equal(state, np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))

#Solve the puzzle with path search
#We can introduce pruning: if the last move undoes the previous one, we can skip it 
def solve(state: np.ndarray) -> list['Action']:
    last_action = None
    visited = set()
    queue = deque([(state, [])])  # Using deque for efficient FIFO operations
    while queue:
        current_state, current_path = queue.pop  () #BFS or DFS depending on popleft() or pop()
        if is_solved(current_state):
            return current_path
        visited.add(state.tobytes()) #state.tobytes() for a more efficient hashable representation
        last_action = current_state
        for a in available_actions(current_state):
            new_state = do_action(current_state, a)
            if last_action is not None and np.array_equal(new_state, last_action):
                continue
            elif new_state.tobytes() not in visited:
                queue.append((new_state, current_path + [a]))
    return None


solution = solve(state)
solution

In [None]:
len(solution)

In [None]:

def is_solved(state: np.ndarray) -> bool:
    return np.array_equal(state, np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))

def heuristic(state: np.ndarray) -> int:
    target_positions = {val: (i, j) for i in range(PUZZLE_DIM) for j in range(PUZZLE_DIM) for val in [i * PUZZLE_DIM + j + 1]}
    target_positions[0] = (PUZZLE_DIM - 1, PUZZLE_DIM - 1)  # Posizione target del blocco vuoto (0)

    dist = 0
    for i in range(PUZZLE_DIM):
        for j in range(PUZZLE_DIM):
            val = state[i, j]
            if val != 0:  # Ignora il blocco vuoto
                target_i, target_j = target_positions[val]
                dist += abs(i - target_i) + abs(j - target_j)  # Distanza di Manhattan
    return dist

def solve2(state: np.ndarray) -> tuple[list, int]:
    counter_action_evaluated = 0
    visited = {}  # Stato in bytes -> costo minimo trovato
    queue = []
    heappush(queue, (0, state.tobytes(), []))  # Usa lo stato serializzato
    visited[state.tobytes()] = 0  # Traccia lo stato iniziale con costo 0

    while queue:
        _, current_state_bytes, current_path = heappop(queue)
        
        # Decodifica lo stato da bytes a np.ndarray per verificare se è risolto
        current_state = np.frombuffer(current_state_bytes, dtype=state.dtype).reshape(state.shape)
        
        # Verifica se il puzzle è risolto
        if is_solved(current_state):
            return (current_path,counter_action_evaluated)

        # Genera nuovi stati per ogni azione possibile
        for a in available_actions(current_state):
            counter_action_evaluated += 1
            new_state = do_action(current_state, a)
            new_cost = len(current_path) + 1
            new_state_bytes = new_state.tobytes()

            # Se lo stato non è visitato o ha un costo inferiore
            if new_state_bytes not in visited or visited[new_state_bytes] > new_cost:
                visited[new_state_bytes] = new_cost
                priority = new_cost + heuristic(new_state)

                # Creiamo un nuovo percorso come copia di `current_path`
                new_path = list(current_path) + [a]
                
                # Aggiungi alla coda di priorità usando `new_state_bytes`
                heappush(queue, (priority, new_state_bytes, new_path))

    return None

solution,cost = solve2(state)
#Joblib
print(solution,cost)

In [None]:
len(solution)

In [None]:
def is_solved(state: np.ndarray) -> bool:
    return np.array_equal(state, np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))
#A* with improved heuristic
def heuristic(state: np.ndarray) -> int:
    PUZZLE_DIM = state.shape[0]
    target_positions = {val: (i, j) for i in range(PUZZLE_DIM) for j in range(PUZZLE_DIM) for val in [i * PUZZLE_DIM + j + 1]}
    target_positions[0] = (PUZZLE_DIM - 1, PUZZLE_DIM - 1)  # Posizione target del blocco vuoto (0)

    manhattan_dist = 0
    linear_conflict = 0

    for i in range(PUZZLE_DIM):
        for j in range(PUZZLE_DIM):
            val = state[i, j]
            if val != 0:
                target_i, target_j = target_positions[val]
                manhattan_dist += abs(i - target_i) + abs(j - target_j)

                # Linear conflict: se due pezzi sono nella stessa riga o colonna
                if i == target_i:  # Nella riga corretta
                    for k in range(j + 1, PUZZLE_DIM):
                        val2 = state[i, k]
                        if val2 != 0:
                            target_i2, target_j2 = target_positions[val2]
                            if i == target_i2 and target_j2 < target_j:
                                linear_conflict += 2
                if j == target_j:  # Nella colonna corretta
                    for k in range(i + 1, PUZZLE_DIM):
                        val2 = state[k, j]
                        if val2 != 0:
                            target_i2, target_j2 = target_positions[val2]
                            if j == target_j2 and target_i2 < target_i:
                                linear_conflict += 2

    return manhattan_dist + linear_conflict


def solve3(state: np.ndarray) -> tuple[list, int]:
    counter_action_evaluated = 0
    visited = {}  # Stato in bytes -> costo minimo trovato
    queue = []
    heappush(queue, (0, state.tobytes(), []))  # Usa lo stato serializzato
    visited[state.tobytes()] = 0  # Traccia lo stato iniziale con costo 0

    while queue:
        _, current_state_bytes, current_path = heappop(queue)
        
        # Decodifica lo stato da bytes a np.ndarray per verificare se è risolto
        current_state = np.frombuffer(current_state_bytes, dtype=state.dtype).reshape(state.shape)
        
        # Verifica se il puzzle è risolto
        if is_solved(current_state):
            return (current_path,counter_action_evaluated)

        # Genera nuovi stati per ogni azione possibile
        for a in available_actions(current_state):
            counter_action_evaluated += 1
            new_state = do_action(current_state, a)
            new_cost = len(current_path) + 1
            new_state_bytes = new_state.tobytes()

            # Se lo stato non è visitato o ha un costo inferiore
            if new_state_bytes not in visited or visited[new_state_bytes] > new_cost:
                visited[new_state_bytes] = new_cost
                priority = new_cost + heuristic(new_state)

                # Creiamo un nuovo percorso come copia di `current_path`
                new_path = list(current_path) + [a]
                
                # Aggiungi alla coda di priorità usando `new_state_bytes`
                heappush(queue, (priority, new_state_bytes, new_path))

    return None

solution,cost = solve3(state)
#Joblib
print(solution,cost)

Enhanced A* combining a lot of heuristics-> this is the one that works

In [None]:

class PuzzleHeuristicService:
    def __init__(self, goal_state: np.ndarray):
        self.goal_state = goal_state

    def heuristic_manhattan_distance(self, position: np.ndarray) -> int:
        distance = 0
        size = len(position)
        for i in range(size):
            for j in range(size):
                tile = position[i][j]
                if tile != 0:
                    target_row = (tile - 1) // size
                    target_col = (tile - 1) % size
                    distance += abs(i - target_row) + abs(j - target_col)
        return distance

    def heuristic_linear_conflict(self, position: np.ndarray) -> int:
        conflict = 0
        size = len(position)

        # Row conflicts
        for row in range(size):
            max_val = -1
            for col in range(size):
                value = position[row][col]
                if value != 0 and (value - 1) // size == row:
                    if value > max_val:
                        max_val = value
                    else:
                        conflict += 2

        # Column conflicts
        for col in range(size):
            max_val = -1
            for row in range(size):
                value = position[row][col]
                if value != 0 and (value - 1) % size == col:
                    if value > max_val:
                        max_val = value
                    else:
                        conflict += 2

        return conflict

    def heuristic_walking_distance(self, position: np.ndarray) -> int:
        # Calculate the Manhattan distance grid
        size = len(position)
        distance_grid = [[0] * size for _ in range(size)]

        for row in range(size):
            for col in range(size):
                value = position[row][col]
                if value != 0:
                    target_row = (value - 1) // size
                    target_col = (value - 1) % size
                    distance_grid[row][col] = abs(row - target_row) + abs(col - target_col)

        # Sum the distances
        walking_distance = sum(sum(row) for row in distance_grid)
        return walking_distance

    def combined_heuristic(self, position: np.ndarray) -> int:
        return (
            self.heuristic_manhattan_distance(position)
            + self.heuristic_linear_conflict(position)
            + self.heuristic_walking_distance(position)
        )


In [None]:
def solve_with_enhanced_a_star(initial_state: np.ndarray, goal_state: np.ndarray) -> tuple[list, int]:
    heuristic_service = PuzzleHeuristicService(goal_state)

    def calculate_heuristic(state: np.ndarray) -> int:
        return heuristic_service.combined_heuristic(state)

    # Priority queue: (f_score, g_score, state_bytes, path)
    open_set = []
    heappush(open_set, (calculate_heuristic(initial_state), 0, initial_state.tobytes(), []))
    visited = set()
    goal_state_bytes = goal_state.tobytes()

    counter_action_evaluated = 0

    while open_set:
        # Extract the node with the lowest f_score
        f_score, g_score, current_bytes, path = heappop(open_set)
        current_state = np.frombuffer(current_bytes, dtype=initial_state.dtype).reshape(initial_state.shape)

        # Check if we've reached the goal state
        if current_bytes == goal_state_bytes:
            return path, counter_action_evaluated

        # Add current state to visited
        visited.add(current_bytes)

        # Generate all possible moves
        for act in available_actions(current_state):
            counter_action_evaluated += 1
            next_state = do_action(current_state, act)
            next_bytes = next_state.tobytes()

            if next_bytes in visited:
                continue

            # Update scores
            new_g_score = g_score + 1
            new_f_score = new_g_score + calculate_heuristic(next_state)

            # Add new state to open set
            heappush(open_set, (new_f_score, new_g_score, next_bytes, path + [act]))

    return None, counter_action_evaluated  # No solution found

goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
path, evaluated_actions = solve_with_enhanced_a_star(state, goal_state)
print("Path to solution:", path)
print("Number of actions evaluated:", evaluated_actions)

In [None]:
def is_solved(state: np.ndarray) -> bool:
    return np.array_equal(state, np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))

#Let's try to apply actions to the initial state to see if they work
current_state = state.copy()
for act in path:
    current_state = do_action(current_state, act)
print("Is the puzzle solved?", is_solved(current_state))
print(current_state)

IDA*

In [None]:
from typing import Tuple, Union, List, Set

class PuzzleHeuristicService:
    def __init__(self, goal_state: np.ndarray):
        self.goal_state = goal_state
        size = goal_state.shape[0]
        self.target_positions = {
            tile: (tile // size, tile % size) for tile in range(1, size * size)
        }
        self.target_positions[0] = (size - 1, size - 1)

    def heuristic_manhattan_distance(self, position: np.ndarray) -> int:
        distance = 0
        size = position.shape[0]
        for row in range(size):
            for col in range(size):
                tile = position[row][col]
                if tile != 0:
                    target_row, target_col = self.target_positions[tile]
                    distance += abs(row - target_row) + abs(col - target_col)
        return distance

    def heuristic_linear_conflict(self, position: np.ndarray) -> int:
        conflict = 0
        size = position.shape[0]

        # Row conflicts
        for row in range(size):
            max_val = -1
            for col in range(size):
                value = position[row][col]
                if value != 0 and (value - 1) // size == row:
                    if value > max_val:
                        max_val = value
                    else:
                        conflict += 2

        # Column conflicts
        for col in range(size):
            max_val = -1
            for row in range(size):
                value = position[row][col]
                if value != 0 and (value - 1) % size == col:
                    if value > max_val:
                        max_val = value
                    else:
                        conflict += 2

        return conflict

    def heuristic_walking_distance(self, position: np.ndarray) -> int:
        size = position.shape[0]
        distance = 0
        for row in range(size):
            for col in range(size):
                tile = position[row][col]
                if tile != 0:
                    target_row, target_col = self.target_positions[tile]
                    distance += abs(row - target_row) + abs(col - target_col)
        return distance

    def combined_heuristic(self, position: np.ndarray) -> int:
        return (
            self.heuristic_manhattan_distance(position)
            + self.heuristic_linear_conflict(position)
            + self.heuristic_walking_distance(position)
        )



def ida_star(initial_state: np.ndarray, goal_state: np.ndarray) -> tuple[list, int]:
    heuristic_service = PuzzleHeuristicService(goal_state)

    def calculate_heuristic(state: np.ndarray) -> int:
        return heuristic_service.combined_heuristic(state)

    def search(state: np.ndarray, g: int, threshold: int, path: list, visited: set) -> tuple[int, Union[None, list]]:
        """
        Ricerca limitata basata sul costo `f = g + h`.

        Ritorna:
        - Nuovo limite (`next_threshold`) se il nodo attuale supera il limite.
        - Il percorso se lo stato obiettivo viene trovato.
        """
        h = calculate_heuristic(state)
        f = g + h

        # Se il costo supera il limite, ritorna il nuovo limite
        if f > threshold:
            return f, None

        # Se lo stato è quello obiettivo, ritorna il percorso
        if np.array_equal(state, goal_state):
            return f, path

        # Aggiorna il limite successivo
        next_threshold = float('inf')

        # Genera mosse valide
        for action in available_actions(state):
            next_state = do_action(state, action)

            # Evita cicli controllando gli stati già visitati
            state_tuple = tuple(next_state.flatten())
            if state_tuple in visited:
                continue

            visited.add(state_tuple)
            new_g = g + 1
            result, solution = search(next_state, new_g, threshold, path + [action], visited)
            visited.remove(state_tuple)

            # Se troviamo una soluzione, la ritorniamo immediatamente
            if solution is not None:
                return result, solution

            # Aggiorna il prossimo limite
            next_threshold = min(next_threshold, result)

        return next_threshold, None

    # Calcola il valore iniziale della soglia
    threshold = calculate_heuristic(initial_state)
    visited = {tuple(initial_state.flatten())}
    path = []

    while True:
        # Esegui la ricerca limitata
        next_threshold, solution = search(initial_state, 0, threshold, path, visited)

        # Se troviamo una soluzione, la ritorniamo
        if solution is not None:
            return solution, len(solution)

        # Se il limite non si aggiorna, significa che non ci sono soluzioni
        if next_threshold == float('inf'):
            return None, float('inf')

        # Aggiorna la soglia
        threshold = next_threshold

goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
path, evaluated_actions = ida_star(state, goal_state)

DDA*

In [None]:
from heapq import heappush, heappop
from typing import List, Tuple, Dict, Set, Union
import numpy as np

class EnhancedPuzzleHeuristicService:
    def __init__(self, goal_state: np.ndarray):
        self.goal_state = goal_state

    def heuristic_manhattan_distance(self, state: np.ndarray) -> int:
        distance = 0
        size = len(state)
        for i in range(size):
            for j in range(size):
                tile = state[i][j]
                if tile != 0:
                    target_row = (tile - 1) // size
                    target_col = (tile - 1) % size
                    distance += abs(i - target_row) + abs(j - target_col)
        return distance

    def heuristic_linear_conflict(self, state: np.ndarray) -> int:
        conflict = 0
        size = state.shape[0]
        for row in range(size):
            row_goal = self.goal_state[row]
            row_state = state[row]
            for i in range(size):
                for j in range(i + 1, size):
                    if (
                        row_state[i] in row_goal
                        and row_state[j] in row_goal
                        and row_state[i] > row_state[j]
                    ):
                        conflict += 2
        return conflict

    def combined_heuristic(self, state: np.ndarray) -> int:
        # Usa Manhattan + Linear Conflict come combinazione euristica
        return self.heuristic_manhattan_distance(state) + self.heuristic_linear_conflict(state)


def enhanced_a_star(
    initial_state: np.ndarray,
    goal_state: np.ndarray,
    heuristic_service: EnhancedPuzzleHeuristicService,
) -> Tuple[Union[List, None], int]:
    def calculate_heuristic(state: np.ndarray) -> int:
        return heuristic_service.combined_heuristic(state)

    open_set = []  # Priority queue: (f_score, state, path)
    heappush(open_set, (calculate_heuristic(initial_state), 0, initial_state.tobytes(), []))
    visited: Dict[bytes, int] = {}  # Stato serializzato -> costo minimo

    evaluated_actions = 0
    goal_bytes = goal_state.tobytes()

    while open_set:
        f_score, g_score, current_bytes, path = heappop(open_set)
        current_state = np.frombuffer(current_bytes, dtype=initial_state.dtype).reshape(initial_state.shape)

        # Se siamo arrivati al goal
        if current_bytes == goal_bytes:
            return path, evaluated_actions

        # Se abbiamo già visitato questo nodo con un costo minore, lo saltiamo
        if current_bytes in visited and visited[current_bytes] <= g_score:
            continue

        # Memorizza il costo migliore trovato finora
        visited[current_bytes] = g_score

        # Genera le azioni disponibili
        for action in available_actions(current_state):
            evaluated_actions += 1
            next_state = do_action(current_state, action)
            next_bytes = next_state.tobytes()
            next_g_score = g_score + 1
            next_f_score = next_g_score + calculate_heuristic(next_state)

            if next_bytes not in visited or visited[next_bytes] > next_g_score:
                heappush(open_set, (next_f_score, next_g_score, next_bytes, path + [action]))

    return None, evaluated_actions  # Nessuna soluzione trovata

goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
heuristic_service = EnhancedPuzzleHeuristicService(goal_state)
path, evaluated_actions = enhanced_a_star(state, goal_state, heuristic_service)


Bidirectional A*

In [None]:
import numpy as np
from heapq import heappush, heappop
from typing import List, Dict, Tuple



def bidirectional_a_star(initial_state: np.ndarray, goal_state: np.ndarray) -> Tuple[List[int], int]:
    def calculate_heuristic(state: np.ndarray) -> int:
        return heuristic_service.combined_heuristic(state)
    
    # Priority queues for forward and backward searches
    open_set_forward = []
    open_set_backward = []
    
    # Visited states for both forward and backward searches
    visited_forward: Dict[bytes, int] = {}
    visited_backward: Dict[bytes, int] = {}
    
    # Initialize forward search
    heappush(open_set_forward, (calculate_heuristic(initial_state), 0, initial_state.tobytes(), []))
    visited_forward[initial_state.tobytes()] = 0
    
    # Initialize backward search
    heappush(open_set_backward, (calculate_heuristic(goal_state), 0, goal_state.tobytes(), []))
    visited_backward[goal_state.tobytes()] = 0
    
    # The optimal state in bytes
    initial_bytes = initial_state.tobytes()
    goal_bytes = goal_state.tobytes()
    
    evaluated_actions = 0
    meet_node = None  # This will store the meeting point
    
    while open_set_forward and open_set_backward:
        # Forward search
        f_score_f, g_score_f, current_bytes_f, path_f = heappop(open_set_forward)
        current_state_f = np.frombuffer(current_bytes_f, dtype=initial_state.dtype).reshape(initial_state.shape)

        # If meeting point found from forward search
        if current_bytes_f in visited_backward:
            meet_node = current_bytes_f
            total_cost = g_score_f + visited_backward[meet_node]
            # Reconstruct and return the combined path
            combined_path = path_f + path_b[::-1]  # Reverse the backward path
            return combined_path, evaluated_actions
        
        # Generate actions for forward search
        for action in available_actions(current_state_f):
            evaluated_actions += 1
            next_state_f = do_action(current_state_f, action)
            next_bytes_f = next_state_f.tobytes()
            next_g_score_f = g_score_f + 1
            next_f_score_f = next_g_score_f + calculate_heuristic(next_state_f)
            
            if next_bytes_f not in visited_forward or visited_forward[next_bytes_f] > next_g_score_f:
                visited_forward[next_bytes_f] = next_g_score_f
                heappush(open_set_forward, (next_f_score_f, next_g_score_f, next_bytes_f, path_f + [action]))

        # Backward search
        f_score_b, g_score_b, current_bytes_b, path_b = heappop(open_set_backward)
        current_state_b = np.frombuffer(current_bytes_b, dtype=goal_state.dtype).reshape(goal_state.shape)

        # If meeting point found from backward search
        if current_bytes_b in visited_forward:
            meet_node = current_bytes_b
            total_cost = g_score_b + visited_forward[meet_node]
            # Reconstruct and return the combined path
            combined_path = path_b + path_f[::-1]  # Reverse the forward path
            return combined_path, evaluated_actions
        
        # Generate actions for backward search
        for action in available_actions(current_state_b):
            evaluated_actions += 1
            next_state_b = do_action(current_state_b, action)
            next_bytes_b = next_state_b.tobytes()
            next_g_score_b = g_score_b + 1
            next_f_score_b = next_g_score_b + calculate_heuristic(next_state_b)
            
            if next_bytes_b not in visited_backward or visited_backward[next_bytes_b] > next_g_score_b:
                visited_backward[next_bytes_b] = next_g_score_b
                heappush(open_set_backward, (next_f_score_b, next_g_score_b, next_bytes_b, path_b + [action]))

    return None, evaluated_actions  # No solution found
goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
path, evaluated_actions = bidirectional_a_star(state, goal_state)
print("Path to solution:", path)


Taking the best version of previously defined A* and trying to parallelize the job with Joblib to speedup computation

In [None]:
from heapq import heappush, heappop
from typing import List, Tuple, Set
import numpy as np
from joblib import Parallel, delayed



def solve_with_enhanced_a_star_parallel(initial_state: np.ndarray, goal_state: np.ndarray, n_jobs: int = -1) -> Tuple[List[int], int]:
    heuristic_service = PuzzleHeuristicService(goal_state)

    def calculate_heuristic(state: np.ndarray) -> int:
        return heuristic_service.combined_heuristic(state)

    # Priority queue: (f_score, g_score, state_bytes, path)
    open_set = []
    heappush(open_set, (calculate_heuristic(initial_state), 0, initial_state.tobytes(), []))
    visited: Set[bytes] = set()
    goal_state_bytes = goal_state.tobytes()

    counter_action_evaluated = 0

    # Parallelize the generation of next states using joblib
    def generate_next_states(state: np.ndarray, g_score: int, path: List[int]) -> List[Tuple[int, int, np.ndarray, List[int]]]:
        nonlocal counter_action_evaluated
        next_states = []
        for act in available_actions(state):
            counter_action_evaluated += 1
            next_state = do_action(state, act)
            next_bytes = next_state.tobytes()

            if next_bytes in visited:
                continue

            # Update scores
            new_g_score = g_score + 1
            new_f_score = new_g_score + calculate_heuristic(next_state)

            next_states.append((new_f_score, new_g_score, next_state, path + [act]))

        return next_states

    while open_set:
        # Extract the node with the lowest f_score
        f_score, g_score, current_bytes, path = heappop(open_set)
        current_state = np.frombuffer(current_bytes, dtype=initial_state.dtype).reshape(initial_state.shape)

        # Check if we've reached the goal state
        if current_bytes == goal_state_bytes:
            return path, counter_action_evaluated

        # Add current state to visited
        visited.add(current_bytes)

        # Parallelize the generation of the next possible states
        next_states_list = Parallel(n_jobs=n_jobs, backend="threading")(
            delayed(generate_next_states)(current_state, g_score, path)
            for _ in range(n_jobs)
        )

        # Flatten the list of next states
        next_states = [state for sublist in next_states_list for state in sublist]

        # Add all new states to open set
        for next_f_score, next_g_score, next_state, next_path in next_states:
            next_bytes = next_state.tobytes()

            # Only consider states with a better g_score
            if next_bytes not in visited:
                heappush(open_set, (next_f_score, next_g_score, next_bytes, next_path))

    return None, counter_action_evaluated  # No solution found


# Test the parallelized A* search
goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
path, evaluated_actions = solve_with_enhanced_a_star_parallel(state, goal_state, n_jobs=20)

print("Path to solution:", path)
print("Number of actions evaluated:", evaluated_actions)


In [None]:
for act in path:
    current_state = do_action(current_state, act)
print("Is the puzzle solved?", is_solved(current_state))