Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [250]:
from collections import namedtuple
from random import choice
from tqdm.auto import tqdm
import numpy as np
from heapq import heappush, heappop
from typing import List, Callable, Tuple

In [251]:
PUZZLE_DIM = 3
Action = namedtuple('Action', ['pos1', 'pos2'])

In [252]:
def available_actions(state: np.ndarray) -> list['Action']:
    x, y = [int(_[0]) for _ in np.where(state == 0)]
    actions = list()
    if x > 0:
        actions.append(Action((x, y), (x - 1, y)))
    if x < PUZZLE_DIM - 1:
        actions.append(Action((x, y), (x + 1, y)))
    if y > 0:
        actions.append(Action((x, y), (x, y - 1)))
    if y < PUZZLE_DIM - 1:
        actions.append(Action((x, y), (x, y + 1)))
    return actions

def do_action(state: np.ndarray, Action: 'Action') -> np.ndarray:
    new_state = state.copy()
    new_state[Action.pos1], new_state[Action.pos2] = new_state[Action.pos2], new_state[Action.pos1]
    return new_state

def is_goal(state: np.ndarray) -> bool:
    return np.array_equal(state, np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM)))

In [253]:
RANDOMIZE_STEPS = 100_000

def randomize_puzzle():
    state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
    for r in tqdm(range(RANDOMIZE_STEPS), desc='Randomizing'):
        state = do_action(state, choice(available_actions(state)))
    return state

state = randomize_puzzle()
print(state)
print(is_goal(state))

Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

[[3 7 0]
 [5 2 4]
 [1 6 8]]
False


In [254]:
state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
print(state)
print(is_goal(state))
state[0, 0], state[0, 1] = state[0, 1], state[0, 0]
state[1, 2], state[0, 2] = state[0, 2], state[1, 2]

print(state)
print(is_goal(state))

[[1 2 3]
 [4 5 6]
 [7 8 0]]
True
[[2 1 6]
 [4 5 3]
 [7 8 0]]
False


In [255]:
def manhattan_distance(state):
    distance = 0
    for i in range(PUZZLE_DIM):
        for j in range(PUZZLE_DIM):
            tile = state[i,j]
            if tile != 0:
                x, y = divmod(tile - 1, PUZZLE_DIM)
                distance += abs(i - x) + abs(j - y)
        
    return distance


In [256]:
def manhattan_distance_linear_conflict(state):
    distance = 0
    linear_conflict = 0
    
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
    
    for i in range(PUZZLE_DIM):
        
        row_conflict = []
        for j in range(PUZZLE_DIM):
            tile = state[i, j]
            if tile != 0 and goal_state[i, j] != tile:
                goal_row, goal_column = divmod(tile - 1, PUZZLE_DIM)
                if goal_row == i:
                    row_conflict.append((tile, goal_column))
                    distance += abs(i - goal_row) + abs(j - goal_column)
                    
        linear_conflict += count_linear_conflicts(row_conflict)
        
        column_conflict = []
        for i in range(PUZZLE_DIM):
            tile = state[j,i]
            if tile != 0 and goal_state[j, i] != tile:
                goal_row, goal_column = divmod(tile - 1, PUZZLE_DIM)
                if goal_column == i:
                    column_conflict.append((tile, goal_row))
                    distance += abs(j - goal_row) + abs(i - goal_column)
                    
        linear_conflict += count_linear_conflicts(column_conflict)
        
    return distance + 2 * linear_conflict


def count_linear_conflicts(conflict_list: list[tuple[int, int]]) -> int:
    
    conflict_count = 0
    for i in range(len(conflict_list)):
        tile1, goal_pos1 = conflict_list[i]
        for j in range(i+1, len(conflict_list)):
            tile2, goal_pos2 = conflict_list[j]
            if goal_pos1 > goal_pos2:
                conflict_count+=1
    return conflict_count

In [257]:
goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))

state = goal_state.copy()
state[0,1], state[0,2] = state[0,2], state[0,1]
print(goal_state, state)

distance = manhattan_distance(state)
print(distance)

[[1 2 3]
 [4 5 6]
 [7 8 0]] [[1 3 2]
 [4 5 6]
 [7 8 0]]
2


In [258]:
def reconstruct_path(cameFrom: dict, current_tuple: Tuple[int], shape: Tuple[int]) -> List[np.ndarray]:
    path = []
    while current_tuple in cameFrom:
        path.append(tuple_to_state(current_tuple, shape))
        current_tuple = cameFrom[current_tuple]
    path.reverse()
    return path


def state_to_tuple(state: np.ndarray) -> Tuple[int]:
    return tuple(state.flatten())

# Convert tuple back to state
def tuple_to_state(state_tuple: Tuple[int], shape: Tuple[int]) -> np.ndarray:
    return np.array(state_tuple).reshape(shape)

# A* Algorithm
def a_star(start_state: np.ndarray, heuristic: Callable[[np.ndarray], int]) -> List[np.ndarray]:
    # Priority queue (open set)
    priority_queue = []
    visited = set()
    start_tuple = state_to_tuple(start_state)
    
    goal_state = np.array([i for i in range(1, PUZZLE_DIM**2)] + [0]).reshape((PUZZLE_DIM, PUZZLE_DIM))
    goal_tuple = state_to_tuple(goal_state)

    # Initialize g and f
    g = {start_tuple: 0}
    f = {start_tuple: heuristic(start_state)}
    came_from = {}

    # Add the start state to the priority queue
    heappush(priority_queue, (f[start_tuple], start_tuple))

    while priority_queue:
        # Get state with the lowest f
        _, current_tuple = heappop(priority_queue)
        current_state = tuple_to_state(current_tuple, start_state.shape)

        # Goal test
        if is_goal(current_state):
            return reconstruct_path(came_from, current_tuple, start_state.shape)

        if current_tuple in visited:
            continue
        visited.add(current_tuple)
        
        # Explore neighbors
        for action in available_actions(current_state):
            neighbor = do_action(current_state, action)
            neighbor_tuple = state_to_tuple(neighbor)

            g_score = g[current_tuple] + 1  # Cost of moving to the neighbor

            if g_score < g.get(neighbor_tuple, float('inf')):
                # Record best path so far
                came_from[neighbor_tuple] = current_tuple
                g[neighbor_tuple] = g_score
                f[neighbor_tuple] = g_score + heuristic(neighbor)

                if neighbor_tuple not in visited:
                    heappush(priority_queue, (f[neighbor_tuple], neighbor_tuple))
                # Add to open set if not already there
                
                # if neighbor_tuple not in [item[1] for item in priority_queue]:
                #     heappush(priority_queue, (f[neighbor_tuple], neighbor_tuple))

    return None  # No solution found

In [259]:
state = randomize_puzzle()
result = a_star(state, manhattan_distance_linear_conflict)
print(len(result), 'steps')


Randomizing:   0%|          | 0/100000 [00:00<?, ?it/s]

16 steps


In [260]:
# def solve_puzzle(start_state):
#     # Priority queue (min-heap), stores (f_cost, state, path_to_reach_state, g_cost)
#     priority_queue = []
#     heappush(priority_queue, (manhattan_distance_linear_conflict(start_state), tuple(start_state.flatten()), [], 0))
    
#     visited = set()

#     while priority_queue:
#         f_cost, current_state_tuple, path, g_cost = heappop(priority_queue)
#         current_state = np.array(current_state_tuple).reshape(PUZZLE_DIM, PUZZLE_DIM)  # Convert tuple back to numpy array

#         # If we reach the goal state, return the path
#         if is_goal(current_state):
#             return path
        
#         # If already visited, skip this state
#         if current_state_tuple in visited:
#             continue
#         visited.add(current_state_tuple)
        
#         # Explore all valid actions
#         for act in available_actions(current_state):
#             new_state = do_action(current_state, act)
#             new_g_cost = g_cost + 1
#             new_h_cost = manhattan_distance_linear_conflict(new_state)
#             new_f_cost = new_g_cost + new_h_cost
#             new_path = path + [act]
#             new_state_tuple = tuple(new_state.flatten())  # Convert the new state to tuple

#             # Add the new state to the priority queue
#             heappush(priority_queue, (new_f_cost, new_state_tuple, new_path, new_g_cost))

#     return None  # No solution found

In [261]:
# start_state = randomize_puzzle()
# print("Start state:")
# print(start_state)

# solution = solve_puzzle(start_state)
# if solution:
#     print(f"Solution found in {len(solution)} steps:")
#     #for action in solution:
#         #print(action)
# else:
#     print("No solution found")