# CA#1 AI Course
## Fall 2024
### Simin Eskandari
University of Tehran

In [5]:
import random
import heapq
from dataclasses import dataclass, field
from typing import List, Tuple, Callable, Any, Set
import numpy as np
from time import time
from tabulate import tabulate
import threading
import collections
from math import ceil
from enum import Enum
from functools import partial
from copy import deepcopy

In [6]:

class LightsOutPuzzle:
    def __init__(self, board: List[List[int]]):
        self.board = np.array(board, dtype=np.uint8)
        self.size = len(board)

    def toggle(self, x, y):
        for dx, dy in [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.size and 0 <= ny < self.size:
                self.board[nx, ny] ^= 1

    def is_solved(self):
        return np.all(self.board == 0)

    def get_moves(self):
        return [(x, y) for x in range(self.size) for y in range(self.size)]

    def __str__(self):
        return '\n'.join(' '.join(str(cell) for cell in row) for row in self.board)


In [7]:
def create_random_board(size: int,  seed: int, num_toggles: int = None):
    random.seed(time() if seed is None else seed)

    board = [[0 for _ in range(size)] for _ in range(size)]
    puzzle = LightsOutPuzzle(board)

    if num_toggles is None:
        num_toggles = random.randint(1, size * size)

    for _ in range(num_toggles):
        x = random.randint(0, size - 1)
        y = random.randint(0, size - 1)
        puzzle.toggle(x, y)

    return puzzle.board

In [8]:
def show_solution(
    puzzle: LightsOutPuzzle,
    solution: List[Tuple[int, int]],
    algorithm_name: str,
    nodes_visited: int,
    show_steps: bool = False,
):
    print(f"\nSolving with {algorithm_name}:")
    if solution is None:
        print("No solution found.")
        return
    print(f"Solution: {solution}")
    print(f"Nodes visited: {nodes_visited}")
    if show_steps:
        print("\nSolution steps:")
        for i, move in enumerate(solution):
            print(f"\nStep {i+1}: Toggle position {move}")
            puzzle.toggle(*move)
            print(puzzle)
        print(
            "\nFinal state (solved):"
            if puzzle.is_solved()
            else "\nFinal state (not solved):"
        )
        print(puzzle)

In [9]:
tests = [
    create_random_board(3, 42),
    create_random_board(3, 41),
    create_random_board(3, 42, 5),
    create_random_board(4, 42),
    create_random_board(4, 41),
    create_random_board(4, 42, 5),
    create_random_board(5, 42),
    create_random_board(5, 41),
    create_random_board(5, 42, 5),
]

### Breadth-First Search (BFS)
### Benefits:
* Optimality: BFS guarantees finding the shortest path in an unweighted graph.

* Simplicity: The algorithm is straightforward to implement and understand.
### Disadvantages:
* Memory Usage: BFS can use a lot of memory because it stores all nodes at the current depth level before moving to the next level.
    
* Inefficiency for Deep Solutions: BFS can be inefficient for problems with a large state space or deep solutions due to its exhaustive exploration.

In [10]:
# TODO: Must return a list of tuples and the number of visited nodes
from collections import deque                   # This is a double-ended queue from Python's collections module,
                                                # which allows fast append and pop operations on both ends. 


def bfs_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:

    initial_state = puzzle.board.copy()         # to work with copies of the board for each state. 
                                                # NumPy arrays are mutable(meaning they can be changed)
                                                # ,so we make sure each 
                                                # board state is independent of others.
    queue = deque([(initial_state, [])])
    visited = set()        

    visited.add(initial_state.tobytes())        # set tracks previously seen states by converting 
                                                # the NumPy array to bytes using tobytes() for efficient hashing.
                                                # NumPy arrays are not hashable by default.
                                                # By converting to bytes, we can use the state as an entry in the visited set.
    nodes_visited = 0 # A counter 

    while queue:
        current_board, path = queue.popleft()   
        # pop the leftmost element from the queue
        nodes_visited += 1 

        # Check if the current board is solved
        puzzle.board = current_board.copy()     # Set the puzzle to the current board state
        if puzzle.is_solved():
            return path, nodes_visited          # Each element in the queue consists of a tuple: 
                                                # (current_board, path) 
                                                # where current_board is a NumPy array representing 
                                                # the current board state.
                                                # path is the list of moves that led to this state.

        # Explore all possible moves
        for x, y in puzzle.get_moves():
                                                # We generate all possible moves using puzzle.get_moves(). #
                                                # For each move, we toggle the respective light and its neighbors,
                                                # then append the resulting state to the queue if it hasn’t been visited.
            
            new_board = current_board.copy()    
                                                # Before checking if the current state is solved, 
                                                # we set the puzzle.board to current_board.copy(). 
                                                # This allows the puzzle object to correctly reflect
                                                # the current state in the BFS loop.
            puzzle.board = new_board
            puzzle.toggle(x, y)
            new_state = puzzle.board.copy()

            if new_state.tobytes() not in visited:
                visited.add(new_state.tobytes())
                queue.append((new_state, path + [(x, y)]))

    return [], nodes_visited                     
# This function returns a tuple of two things:
# A list of moves (x, y) that solve the puzzle.
# The number of nodes visited during the BFS process.

### Depth-First Search (DFS)
### Benefits:
* Space Efficiency: DFS uses O(b*d) memory, which is often lower than BFS in practice, especially for deep trees.

* Easier to Implement: It is generally easier to implement than other algorithms, requiring less bookkeeping.
### Disadvantages:
* Not Optimal: DFS does not guarantee the shortest path; it may find a solution quickly but not necessarily the best one.

* Risk of Getting Stuck: DFS can get trapped in deep branches of the search tree and may not explore all possibilities, especially if there are cycles.

In [11]:
def depth_limited_search(puzzle: LightsOutPuzzle, depth: int, path: List[Tuple[int, int]], visited: set, nodes_visited: int) -> Tuple[bool, List[Tuple[int, int]], int]:
                    #     depth: An integer indicating the maximum depth to explore in the search.
                    #     path: A list of tuples representing the moves taken to reach the current state.
                    #     visited: A set that tracks previously visited puzzle states to avoid revisiting them.
                    #     nodes_visited: An integer counting how many nodes or states have been explored during the search.


    nodes_visited += 1
    if puzzle.is_solved():
        return True, path, nodes_visited
    if depth == 0:
        return False, path, nodes_visited
    for x, y in puzzle.get_moves():
        new_puzzle = LightsOutPuzzle([list(row) for row in puzzle.board])  
        new_puzzle.toggle(x, y) 
        new_state = new_puzzle.board.tobytes()
        if new_state not in visited:
            visited.add(new_state) 
                                                 # Recursively perform DFS with one less depth
            solved, result_path, nodes_visited = depth_limited_search(new_puzzle, depth - 1, path + [(x, y)], visited, nodes_visited)
            if solved:
                return True, result_path, nodes_visited

                                                 # If no solution was found, returns False
    return False, path, nodes_visited

### Iterative Deepening Search (IDS)
#### Benefits:

* Space Efficiency: IDS combines the benefits of DFS and BFS, using O(b*d) memory (where b is the branching factor and d is the depth of the solution).
* Optimality: Like BFS, IDS is guaranteed to find the optimal solution if the cost is uniform, as it explores all nodes at depth 𝑑 before moving to 𝑑+1.
#### Disadvantages:

* Time Inefficiency: It can be slower than A* or BFS since it revisits nodes multiple times, leading to redundant calculations.
* Depth Limitations: If the depth of the solution is not known and is very deep, IDS can take a long time to find the solution.

In [12]:
# TODO: Must return a list of tuples and the number of visited nodes
def ids_solve(puzzle: LightsOutPuzzle) -> Tuple[List[Tuple[int, int]], int]:
    depth = 0  
    total_nodes_visited = 0 
    while True:
        visited = set()                          # to avoid loops
        visited.add(puzzle.board.tobytes())  
        solved, result_path, nodes_visited = depth_limited_search(puzzle, depth, [], visited, 0)
        total_nodes_visited += nodes_visited  
        if solved:
            return result_path, total_nodes_visited

        depth += 1                              

### A* Search
### Benefits:

* Optimality: A* is guaranteed to find the least-cost path to the goal if the heuristic is admissible (never overestimates the cost).
* Efficiency: It uses both the cost to reach a node and a heuristic to estimate the cost to the goal, allowing it to explore more promising paths first.
* Flexibility: Different heuristics can be used depending on the problem domain, allowing for customized performance.
### Disadvantages:

* Memory Consumption: A* stores all generated nodes in memory, which can be prohibitive for large state spaces.
* Dependence on Heuristic Quality: The efficiency of A* is heavily dependent on the quality of the heuristic used; a poor heuristic can degrade its performance to that of BFS.

In [13]:
from queue import PriorityQueue
from typing import List, Tuple, Callable, Set
from copy import deepcopy
import numpy as np

def astar_solve(puzzle: LightsOutPuzzle, heuristic: Callable[[LightsOutPuzzle], int]) -> Tuple[List[Tuple[int, int]], int]:                         
    frontier = PriorityQueue()                   # Priority Queue for A* search (elements are tuples: 
                                                 # (estimated_total_cost, actual_cost, path, puzzle))
    
                                                 # Initial state: (heuristic estimate, 0 moves so far, empty path, initial puzzle)
    frontier.put((heuristic(puzzle), 0, [], puzzle))
    
                                
    visited: Set[str] = set()                    # Using Set in order to keeping track of visited states 
                                                 # (using board state as a string for hashing)
    visited.add(str(puzzle.board))
    nodes_visited = 0
    
    while not frontier.empty():
                                                 # puzzle state with the lowest estimated total cost
        estimated_total_cost, cost_so_far, path, current_puzzle = frontier.get()
        nodes_visited += 1
        if current_puzzle.is_solved():
            return path, nodes_visited   
                                    
        for move in current_puzzle.get_moves():
            new_puzzle = LightsOutPuzzle([list(row) for row in current_puzzle.board])
            new_puzzle.toggle(move[0], move[1])
            new_board_str = str(new_puzzle.board)
            if new_board_str not in visited:
                visited.add(new_board_str)                 
                new_cost_so_far = cost_so_far + 1# The cost of reaching this new state is cost_so_far + 1 (1 move added)
                
                estimated_total_cost = new_cost_so_far + heuristic(new_puzzle)
                                                 # The estimated total cost (new_cost_so_far + heuristic)
                frontier.put((estimated_total_cost, new_cost_so_far, path + [move], new_puzzle))    
    return [], nodes_visited

### Heuristic 1: Count the Number of Lights Still On

#### How it Works: 
###### This heuristic simply counts the number of lights that are still on in the puzzle. It provides a measure of how far the puzzle is from being solved; fewer lights on means closer to the goal.
* Admissibility: Admissible because it never overestimates the cost to reach the goal. The minimum cost to turn off all lights is at least as many moves as there are lights that are still on.
* Consistency: Consistent (or "monotonic") because the estimated cost to reach the goal does not increase as you make progress toward the goal (toggling one light will decrease the count by at least one).


### Heuristic 2: Weighted Distance Heuristic

#### How it Works: 
##### This heuristic calculates a weighted sum of the distances of each light that is on from the center of the board. The idea is that lights farther from the center should have a greater impact on the heuristic value, implying that toggling them may have a larger effect on solving the puzzle.
* Admissibility: Admissible because the distance to the goal will not be less than the number of moves required to turn off the lights, which won't exceed the computed distance.
* Consistency: Consistent since toggling a light will decrease its distance contribution and any adjacent lights will also be affected. The cost never increases as you approach the goal.

### Heuristic 3: Count Adjacent Lights That Are On

#### How it Works: 
##### This heuristic counts how many lights that are on have adjacent lights that are also on. It reflects the idea that lights that are adjacent can influence each other, and addressing clusters of lights might be more effective than addressing isolated lights.
* Admissibility: Admissible because it never overestimates the minimum number of moves required to solve the puzzle; each adjacent light counts as a potential toggle.
* Consistency: Not consistent as the heuristic may increase if a move toggles a light and affects neighboring lights, leading to potential increases in the heuristic value without necessarily moving closer to the solution.

#### Heuristic 4: Weigh Corners and Edges Differently

#### How it Works: 
##### This heuristic weighs lights differently based on their positions. Lights in the corners are considered less influential (2 points), edges slightly more (1 point), and center lights the most (3 points). This reflects the idea that center lights might affect more adjacent lights than corner ones.
* Admissibility: Admissible because even with the weighting, the contribution to the heuristic never exceeds the moves required to turn off all lights.
* Consistency: Not consistent because the contributions can be misleading; toggling a light might change its immediate influence and could lead to inconsistencies in the heuristic as lights are toggled.

In [14]:
from typing import List
import numpy as np

# Count the number of lights still on (1)
def heuristic1(puzzle: LightsOutPuzzle) -> int:
    return np.sum(puzzle.board)

# Weighted distance heuristic if it is closer to center it counts more
def heuristic2(puzzle: LightsOutPuzzle) -> int:
    size = puzzle.size
    center = size // 2
    heuristic_value = 0

    for x in range(size):
        for y in range(size):
            if puzzle.board[x, y] == 1:
            # Add weighted value based on distance to the center
                distance = abs(x - center) + abs(y - center)
                heuristic_value += distance
    return heuristic_value


# Count adjacent lights that are on
def heuristic3(puzzle: LightsOutPuzzle) -> int:
    size = puzzle.size
    heuristic_value = 0
    
    for x in range(size):
        for y in range(size):
            if puzzle.board[x, y] == 1:          # If the current light is on
                                                 # Check all adjacent lights
                if x > 0 and puzzle.board[x - 1, y] == 1:
                    heuristic_value += 1         # Up
                if x < size - 1 and puzzle.board[x + 1, y] == 1:
                    heuristic_value += 1         # Down
                if y > 0 and puzzle.board[x, y - 1] == 1:
                    heuristic_value += 1         # Left
                if y < size - 1 and puzzle.board[x, y + 1] == 1:
                    heuristic_value += 1         # Right
    return heuristic_value

# Weigh corners and edges differently
def heuristic4(puzzle: LightsOutPuzzle) -> int:
    size = puzzle.size
    heuristic_value = 0
    
    for x in range(size):
        for y in range(size):
            if puzzle.board[x, y] == 1:
                if (x in {0, size - 1}) and (y in {0, size - 1}):  
                                                 # Corners
                    heuristic_value += 2
                elif (x in {0, size - 1} or y in {0, size - 1}):  
                                                 # Edges
                    heuristic_value += 1
                else:                            # Center
                    heuristic_value += 3
    return heuristic_value
heuristics = [heuristic1, heuristic2, heuristic3, heuristic4]


# Heuristics Overview

| **Admissible Heuristics** | **Consistent Heuristics** |
|----------------------------|---------------------------|
| Heuristic 1                | Heuristic 1               |
| Heuristic 2                | Heuristic 2               |
| Heuristic 3                |                           |
| Heuristic 4                |                           |

### Weighted A* Search
### Benefits:

* Performance Tuning: By adjusting the weight of the heuristic, you can trade-off between optimality and performance, potentially speeding up the search for certain problems.
* Improved Heuristic Use: It can outperform standard A* in scenarios where a faster, less optimal solution is acceptable.
### Disadvantages:

* Non-Optimal Solutions: Depending on the weight used, Weighted A* can produce suboptimal solutions.
* Heuristic Dependency: Like A*, its efficiency is still reliant on the quality of the heuristic.

In [15]:
from typing import Callable

# Original heuristic1: counts the number of "on" lights
def heuristic1(puzzle: LightsOutPuzzle) -> int:
    return np.sum(puzzle.board)

# Weighted version of heuristic1
def weighted_heuristic1(puzzle: LightsOutPuzzle, alpha: int = 5) -> int:
    return heuristic1(puzzle) * alpha

# Original heuristic2: weighted distance heuristic
def heuristic2(puzzle: LightsOutPuzzle) -> int:
    size = puzzle.size
    center = size // 2
    heuristic_value = 0

    for x in range(size):
        for y in range(size):
            if puzzle.board[x, y] == 1:
                distance = abs(x - center) + abs(y - center)
                heuristic_value += distance
    return heuristic_value

# Weighted version of heuristic2
def weighted_heuristic2(puzzle: LightsOutPuzzle, alpha: int = 5) -> int:
    return heuristic2(puzzle) * alpha

# Store the weighted heuristics in a list
weighted_heuristics: List[Callable[[LightsOutPuzzle, int], int]] = [
    weighted_heuristic1, 
    weighted_heuristic2
]

# Assign the weights you want to run
weights = [3, 7]


In [16]:
class SearchStatus(Enum):
    SOLVED = "Solved"
    TIMEOUT = "Timeout"
    NO_SOLUTION = "No Solution"

@dataclass
class SearchResult:
    result: SearchStatus
    solution: Any = None
    nodes_visited: int = None
    time: float = None

    def __str__(self):
        output = f"Result: {self.result.value}\n"
        if self.result == SearchStatus.SOLVED:
            output += f"Solution: {self.solution}\n"
        if self.result != SearchStatus.TIMEOUT:
            output += f"Nodes Visited: {self.nodes_visited}\n"
            output += f"Time Taken: {self.time:.3f} seconds\n"

        return output


DEFAULT_SEARCH_RESULT = SearchResult(SearchStatus.TIMEOUT, None, None, float("inf"))

In [17]:
def run_searches(func: Callable, args: Any, time_limit: float = 60.0, name: str = "") -> SearchResult:
    result = []

    def target():
        try:
            solution, nodes_visited = func(*args)
            result.append((solution, nodes_visited))
        except Exception as e:
            result.append(e)

    thread = threading.Thread(target=target)
    thread.start()

    start_time = time()
    thread.join(timeout=time_limit)

    if thread.is_alive() or not result:
        print(f"\nTime limit of {time_limit} seconds exceeded for {name}")
        return SearchResult(SearchStatus.TIMEOUT)

    if isinstance(result[0], Exception):
        raise result[0]

    solution, nodes_visited = result[0]
    show_solution(args[0], solution, name, nodes_visited)
    time_taken = time() - start_time
    return SearchResult(SearchStatus.SOLVED if solution else SearchStatus.NO_SOLUTION, solution, nodes_visited, time_taken)

In [18]:
def run_test(test: List, heuristics: List[Callable[[Any], int]], time_limit: float = 60.0):

    print(f"Running test:\n {test}")

    puzzle = LightsOutPuzzle(test)

    bfs_result = run_searches(bfs_solve, (deepcopy(puzzle),), name="BFS", time_limit=time_limit)
    ids_result = run_searches(ids_solve, (deepcopy(puzzle),), name="IDS", time_limit=time_limit)

    astar_results = []
    for heuristic in heuristics:
        astar_results.append(
            run_searches(
                astar_solve,
                (deepcopy(puzzle), heuristic),
                name=f"A*({heuristic.__name__})",
                time_limit=time_limit,
            )
        )

    weighted_astar_results = []
    for heuristic in weighted_heuristics:
        for weight in weights:
          weighted_astar_results.append(
              run_searches(
                  astar_solve,
                  (deepcopy(puzzle), partial(heuristic, alpha = weight)),
                  name=f"weighted A*({heuristic.__name__}, alpha = {weight})",
                  time_limit=time_limit,
              )
          )

    print("\n-------------------------------\n")

    return (
        test,
        bfs_result,
        ids_result,
        *astar_results,
        *weighted_astar_results
    )

In [19]:
results: List[Tuple[str, SearchResult, SearchResult, SearchResult, SearchResult]] = []

for test in tests:
    results.append(run_test(test, heuristics, 180))

Running test:
 [[1 1 1]
 [1 1 1]
 [1 0 0]]

Solving with BFS:
Solution: [(0, 2), (1, 0)]
Nodes visited: 26

Solving with IDS:
Solution: [(0, 2), (1, 0)]
Nodes visited: 31

Solving with A*(heuristic1):
Solution: [(1, 0), (0, 2)]
Nodes visited: 4

Solving with A*(heuristic2):
Solution: [(1, 0), (0, 2)]
Nodes visited: 5

Solving with A*(heuristic3):
Solution: [(1, 0), (0, 2)]
Nodes visited: 20

Solving with A*(heuristic4):
Solution: [(1, 0), (0, 2)]
Nodes visited: 5

Solving with weighted A*(weighted_heuristic1, alpha = 3):
Solution: [(1, 0), (0, 2)]
Nodes visited: 6

Solving with weighted A*(weighted_heuristic1, alpha = 7):
Solution: [(1, 0), (0, 2)]
Nodes visited: 6

Solving with weighted A*(weighted_heuristic2, alpha = 3):
Solution: [(1, 0), (0, 2)]
Nodes visited: 11

Solving with weighted A*(weighted_heuristic2, alpha = 7):
Solution: [(1, 0), (0, 2)]
Nodes visited: 40

-------------------------------

Running test:
 [[1 1 0]
 [1 0 0]
 [1 1 1]]

Solving with BFS:
Solution: [(0, 0), (0,

In [20]:
table_data: List[List[str]] = []

for result in results:
    table_data.append(
        [
            "\n".join(" ".join(str(cell) for cell in row) for row in result[0]),
            str(result[1]),
            str(result[2]),
        ]
    )

    for i in range(len(heuristics)):
        table_data[-1].append(str(result[3 + i]))

    for i in range(len(weighted_heuristics)):
        for j in range(len(weights)):
          table_data[-1].append(str(result[3 + len(heuristics) + i * 2 + j]))

    table_data.append("\n\n\n")

if table_data[-1] == "\n\n\n":
    table_data.pop()

print(len(table_data))
print("Results:")
print(
    tabulate(
        table_data,
        headers=[
            "Test",
            "BFS",
            "IDS",
            *[f"A* ({heuristic.__name__})" for heuristic in heuristics],
            *[f"Weighted A* ({heuristic.__name__}, alpha = {weight})" for heuristic in weighted_heuristics for weight in weights]
        ],
        floatfmt=".3f",
        numalign="center",
        stralign="center",
        tablefmt="grid",
    )
)


17
Results:
+-----------+----------------------------------------------------+--------------------------------------------------------------------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------