In [1]:
import random
import heapq
from dataclasses import dataclass, field
from typing import List, Tuple, Callable, Any, Set
import numpy as np
from time import time
from tabulate import tabulate
import threading
from collections import deque
from math import ceil
from enum import Enum
from functools import partial
from copy import deepcopy

In [2]:

class LightsOutPuzzle:
    def __init__(self, board: List[List[int]]):
        self.board = np.array(board, dtype=np.uint8)
        self.size = len(board)

    def toggle(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.board[nx, ny] ^= 1

    def is_solved(self):
        return np.all(self.board == 0)

    def get_moves(self):
        return [(x, y) for x in range(self.size) for y in range(self.size)]

    def __str__(self):
        return '\n'.join(' '.join(str(cell) for cell in row) for row in self.board)


In [3]:
def create_random_board(size: int,  seed: int, num_toggles: int = None):
    random.seed(time() if seed is None else seed)

    board = [[0 for _ in range(size)] for _ in range(size)]
    puzzle = LightsOutPuzzle(board)

    if num_toggles is None:
        num_toggles = random.randint(1, size * size)

    for _ in range(num_toggles):
        x = random.randint(0, size - 1)
        y = random.randint(0, size - 1)
        puzzle.toggle(x, y)

    return puzzle.board

In [4]:
def show_solution(
    puzzle: LightsOutPuzzle,
    solution: List[Tuple[int, int]],
    algorithm_name: str,
    nodes_visited: int,
    show_steps: bool = False,
):
    print(f"\nSolving with {algorithm_name}:")
    if solution is None:
        print("No solution found.")
        return
    print(f"Solution: {solution}")
    print(f"Nodes visited: {nodes_visited}")
    if show_steps:
        print("\nSolution steps:")
        for i, move in enumerate(solution):
            print(f"\nStep {i+1}: Toggle position {move}")
            puzzle.toggle(*move)
            print(puzzle)
        print(
            "\nFinal state (solved):"
            if puzzle.is_solved()
            else "\nFinal state (not solved):"
        )
        print(puzzle)

In [5]:
tests = [
    create_random_board(3, 42),
    create_random_board(3, 41),
    create_random_board(3, 42, 5),
    create_random_board(4, 42),
    create_random_board(4, 41),
    create_random_board(4, 42, 5),
    create_random_board(5, 42),
    create_random_board(5, 41),
    create_random_board(5, 42, 5),
]

In [6]:
def bfs_solve(puzzle: LightsOutPuzzle)-> Tuple[List[Tuple[int, int]], int]:
    
    queue = deque([(puzzle, [])])                 # initialize the queue
    visited = set()

    while queue:
        current_board, actions = queue.popleft()
        visited.add(str(current_board.board))
        
        for x, y in current_board.get_moves():
            new_board = deepcopy(current_board)   # avoid changing the original given board
            new_board.toggle(x, y)
            new_actions = actions + [(x, y)]      

            if new_board.is_solved():
                return new_actions, len(visited)

            queue.append((new_board, new_actions))# changing to a list of strings so that the set can check the duplication of boards
                                                  # new_board is a 2d array -> set could not get it
    return None, len(visited)


In [7]:
def ids_solve(puzzle: 'LightsOutPuzzle') -> Tuple[List[Tuple[int, int]], int]:
    import time
    max_depth = 0
    time_limit = 5                                      # Time limit in seconds
    start_time = time.time()                            # Record the start time
    visited_nodes = 0
    while time.time() - start_time < time_limit:
        stack = [(deepcopy(puzzle), [], 0)] 
        visited_states = set()                          # Store visited states to avoid cycles
        while stack and time.time() - start_time < time_limit:
            current_board, actions, depth = stack.pop() 
        
            if depth < max_depth:
                if str(current_board.board) not in visited_states:
                    visited_states.add(str(current_board.board))
                
                    for x, y in current_board.get_moves():
                        new_board = deepcopy(current_board)
                        new_board.toggle(x, y)
                        stack.append((new_board, actions + [(x, y)], depth + 1))

                        if new_board.is_solved():
                            return actions + [(x, y)], visited_nodes
                            
                visited_nodes +=1
        max_depth += 1
    return None, visited_nodes

In [8]:
def astar_solve(puzzle: 'LightsOutPuzzle', heuristic: Callable[[LightsOutPuzzle], int]) -> Tuple[List[Tuple[int, int]], int]:
    import time
    visited_nodes = 0  
    time_limit = 120  
    start_time = time.time() 

    priority_queue = []
    state_counter = 0  # prevents Python from trying to compare the LightsOutPuzzle objects directly
    initial_board = (heuristic(puzzle), 0, state_counter, deepcopy(puzzle), [])  # (f, g, counter, board, actions)
    heapq.heappush(priority_queue, initial_board)
    state_counter += 1
    visited_states = set()

    while priority_queue and time.time() - start_time < time_limit:             # Pop the state with the smallest f(n)
        f, g, _, current_board, actions = heapq.heappop(priority_queue)         
        visited_nodes += 1

        if str(current_board.board) not in visited_states:                          # Skip this state if it was already visited
            visited_states.add(str(current_board.board))

            if current_board.is_solved():
                return actions, visited_nodes
            
            for x, y in current_board.get_moves():
                new_board = deepcopy(current_board)
                new_board.toggle(x, y)
                new_actions = actions + [(x, y)]
                h = heuristic(new_board)
                new_g = g + 1
                f = new_g +  h                                                       # f(n) = g(n) + h(n)
                heapq.heappush(priority_queue, (f, new_g , state_counter, new_board, new_actions))  # Add the new state to the priority queue
                state_counter += 1

    return None, visited_nodes


In [9]:
def heuristic1(puzzle: LightsOutPuzzle):
    rows, cols = len(puzzle.board), len(puzzle.board[0])
    visited = [[False] * cols for _ in range(rows)]

    def dfs(x, y):
        stack = [(x, y)]
        directions = [(0, 1), (0, -1), (1, 0), (-1, 0)]

        while stack:
            cx, cy = stack.pop()
            for dx, dy in directions:
                nx, ny = cx + dx, cy + dy
                if 0 <= nx < rows and 0 <= ny < cols and not visited[nx][ny] and puzzle.board[nx][ny] == 1:
                    visited[nx][ny] = True
                    stack.append((nx, ny))

    clusters = 0
    for i in range(rows):
        for j in range(cols):
            if puzzle.board[i][j] == 1 and not visited[i][j]:
                clusters += 1
                visited[i][j] = True
                dfs(i, j)
                
    return clusters

def heuristic2(puzzle: LightsOutPuzzle):

    return sum(sum(row) for row in puzzle.board)     # not consistent


heuristics = [heuristic1 , heuristic2]

In [10]:
def weighted_heuristic1(puzzle: LightsOutPuzzle, alpha = 2):
  return heuristic1(puzzle) * alpha

def weighted_heuristic2(puzzle: LightsOutPuzzle, alpha = 2):
  return heuristic2(puzzle) * alpha

weighted_heuristics = [weighted_heuristic1 , weighted_heuristic2]

weights = [2, 1000]

In [11]:
class SearchStatus(Enum):
    SOLVED = "Solved"
    TIMEOUT = "Timeout"
    NO_SOLUTION = "No Solution"

@dataclass
class SearchResult:
    result: SearchStatus
    solution: Any = None
    nodes_visited: int = None
    time: float = None

    def __str__(self):
        output = f"Result: {self.result.value}\n"
        if self.result == SearchStatus.SOLVED:
            output += f"Solution: {self.solution}\n"
        if self.result != SearchStatus.TIMEOUT:
            output += f"Nodes Visited: {self.nodes_visited}\n"
            output += f"Time Taken: {self.time:.3f} seconds\n"

        return output


DEFAULT_SEARCH_RESULT = SearchResult(SearchStatus.TIMEOUT, None, None, float("inf"))

In [12]:
def run_searches(func: Callable, args: Any, time_limit: float = 60.0, name: str = "") -> SearchResult:
    result = []

    def target():
        try:
            solution, nodes_visited = func(*args)
            result.append((solution, nodes_visited))
        except Exception as e:
            result.append(e)

    thread = threading.Thread(target=target)
    thread.start()

    start_time = time()
    thread.join(timeout=time_limit)

    if thread.is_alive() or not result:
        print(f"\nTime limit of {time_limit} seconds exceeded for {name}")
        return SearchResult(SearchStatus.TIMEOUT)

    if isinstance(result[0], Exception):
        raise result[0]

    solution, nodes_visited = result[0]
    show_solution(args[0], solution, name, nodes_visited)
    time_taken = time() - start_time
    return SearchResult(SearchStatus.SOLVED if solution else SearchStatus.NO_SOLUTION, solution, nodes_visited, time_taken)

In [13]:
def run_test(test: List, heuristics: List[Callable[[Any], int]], time_limit: float = 60.0):

    print(f"Running test:\n {test}")

    puzzle = LightsOutPuzzle(test)

    bfs_result = run_searches(bfs_solve, (deepcopy(puzzle),), name="BFS", time_limit=time_limit)
    ids_result = run_searches(ids_solve, (deepcopy(puzzle),), name="IDS", time_limit=time_limit)

    astar_results = []
    for heuristic in heuristics:
        astar_results.append(
            run_searches(
                astar_solve,
                (deepcopy(puzzle), heuristic),
                name=f"A*({heuristic.__name__})",
                time_limit=time_limit,
            )
        )

    weighted_astar_results = []
    for heuristic in weighted_heuristics:
        for weight in weights:
          weighted_astar_results.append(
              run_searches(
                  astar_solve,
                  (deepcopy(puzzle), partial(heuristic, alpha = weight)),
                  name=f"weighted A*({heuristic.__name__}, alpha = {weight})",
                  time_limit=time_limit,
              )
          )

    print("\n-------------------------------\n")

    return (
        test,
        bfs_result,
        ids_result,
        *astar_results,
        *weighted_astar_results
    )

In [14]:
results: List[Tuple[str, SearchResult, SearchResult, SearchResult, SearchResult]] = []

for test in tests:
    results.append(run_test(test, heuristics, 180))

Running test:
 [[1 1 1]
 [1 1 1]
 [1 0 0]]

Solving with BFS:
Solution: [(0, 2), (1, 0)]
Nodes visited: 4

Solving with IDS:
Solution: [(1, 0), (0, 2)]
Nodes visited: 7

Solving with A*(heuristic1):
Solution: [(0, 2), (1, 0)]
Nodes visited: 7

Solving with A*(heuristic2):
Solution: [(1, 0), (0, 2)]
Nodes visited: 4

Solving with weighted A*(weighted_heuristic1, alpha = 2):
Solution: [(0, 2), (1, 0)]
Nodes visited: 3

Solving with weighted A*(weighted_heuristic1, alpha = 1000):
Solution: [(0, 2), (1, 0)]
Nodes visited: 3

Solving with weighted A*(weighted_heuristic2, alpha = 2):
Solution: [(1, 0), (0, 2)]
Nodes visited: 6

Solving with weighted A*(weighted_heuristic2, alpha = 1000):
Solution: [(1, 0), (0, 2)]
Nodes visited: 7

-------------------------------

Running test:
 [[1 1 0]
 [1 0 0]
 [1 1 1]]

Solving with BFS:
Solution: [(0, 0), (0, 1), (1, 0), (1, 1), (1, 2)]
Nodes visited: 137

Solving with IDS:
Solution: [(1, 1), (1, 0), (1, 2), (0, 1), (0, 0)]
Nodes visited: 423

Solving w

In [15]:
table_data: List[List[str]] = []

for result in results:
    table_data.append(
        [
            "\n".join(" ".join(str(cell) for cell in row) for row in result[0]),
            str(result[1]),
            str(result[2]),
        ]
    )
    for i in range(len(heuristics)):
        table_data[-1].append(str(result[3 + i]))
    for i in range(len(weighted_heuristics)):
        for j in range(len(weights)):
          table_data[-1].append(str(result[3 + len(heuristics) + i * 2 + j]))
    table_data.append("\n\n\n")
if table_data and table_data[-1] == "\n\n\n":
    table_data.pop()
    table_data.pop()

print(len(table_data))
print("Results:")
print(
    tabulate(
        table_data,
        headers=[
            "Test",
            "BFS",
            "IDS",
            *[f"A* ({heuristic.__name__})" for heuristic in heuristics],
            *[f"Weighted A* ({heuristic.__name__}, alpha = {weight})" for heuristic in weighted_heuristics for weight in weights]
        ],
        floatfmt=".3f",
        numalign="center",
        stralign="center",
        tablefmt="grid",
    )
)

16
Results:
+-----------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|   Test    |                        BFS                         |                        IDS                         |                  A* (heuristic1)                   |                          A* (heuristic2)                           |            Weighted 