# AI 201 Programming Assignment 1
## A* Algorithm Implementation

Submitted by: 
Jan Lendl R. Uy, 2019-00312

In [71]:
# CONSTANTS

# Relative path to the input file
INPUT_FILE_PATH = "astar_in.txt"

# Set a maximum number of iterations for A* to avoid infinite loop
MAX_ALGORITHM_ITERATIONS = 50000

## File Handling
This function reads an input .txt file containing the start and goal states of the puzzle then converts the puzzle contents into a two-dimensional tuple of tile values. The input file is stored in the same directory as the notebook file with the filename of "astar_in.txt".

In [72]:
def read_file(path):
    
    with open(path, "r") as file:
        lines = file.readlines()

    # Initialize lists to store the start state and goal state
    start_state = []
    goal_state = []

    # Initialize a flag to keep track if following tiles belong to the start
    # or the goal state
    reading_part = None

    for line in lines:
        line = line.strip() # Remove leading/trailing whitespace
        if line == "start":
            reading_part = "start"
        elif line == "goal":
            reading_part = "goal"
        elif line and reading_part:
            # Convert line into a list, handling '*' and converting numbers to integers
            line_list = [int(x) if x.isdigit() else x for x in line.split()]
            if reading_part == "start":
                start_state.append(line_list)
            elif reading_part == "goal":
                goal_state.append(line_list)
                
    start_state = tuple(tuple(row) for row in start_state)
    goal_state = tuple(tuple(row) for row in goal_state)

    return start_state, goal_state

start, goal = read_file(INPUT_FILE_PATH)
print(f"Start State: {start}")
print(f"Goal State: {goal}")

Start State: ((2, 1, 6), (4, '*', 8), (7, 5, 3))
Goal State: ((1, 2, 3), (8, '*', 4), (7, 6, 5))


## PuzzleBoard
This class encapsulates the variables of interest of the 8-puzzle, which include the following:

- Board contents
- Coordinates of the empty tile
- Valid movements

Among the operators overridden in this class are the `__str__` method to "prettify" the printing of the 8-puzzle and the `__eq__` and `__hash__` for checking of equal board contents. Aside from this, the following methods were implemented to simplify the operations of the game:

- get_empty_tile_coords: Retrieves the coordinates of the empty space, with the origin set as the top leftmost tile
- get_valid_movements: Checks for all the possible tile movements based on the coordinates of the empty space
- add_coordinates: Returns the sum of two coordinates for tile movement tracking
- swap: Swaps the location of two entities in the puzzle, thus simulating the movement of tiles since the other entity is always the empty space

In [73]:
class PuzzleBoard:
    
    def __init__(self, board_as_2d_tuple):
        self.board_contents = board_as_2d_tuple
        self.empty_tile_coords = self.get_empty_tile_coords()
        self.valid_movements = self.get_valid_movements()   
        
        self.board_in_str = ""
        for row in self.board_contents:
            # print(f"row = {row}")
            for tile in row:
                # print(f"tile = {tile}")
                self.board_in_str += f"| {tile} "
            self.board_in_str += "|\n"
       
    # Print override
    # Formats the Puzzle printing as a 3x3 grid-like structure 
    def __str__(self):
        return self.board_in_str
    
    # Equality override
    # Checks if two boards have exactly the same elements in 
    # the same location 
    def __eq__(self, other_puzzle):
        return isinstance(other_puzzle, PuzzleBoard) and self.board_contents == other_puzzle.board_contents
    
    # Hash override
    # Uses board contents as basis for equality check
    def __hash__(self):
        return hash(tuple(self.board_contents))
    
    def get_board_as_string(self):
        return self.board_in_str 
    
    def add_coordinates(self, coords_1, coords_2):
        return tuple(map(lambda a, b: a + b, coords_1, coords_2))    
    
    def swap(self, coords_orig, coords_new):
        x, y = coords_orig
        new_x, new_y = coords_new
        board_copy = list(list(row) for row in self.board_contents)
        board_copy[x][y], board_copy[new_x][new_y] = board_copy[new_x][new_y], board_copy[x][y]
        return tuple(tuple(row) for row in board_copy)
        
    def get_valid_movements(self):
        # Define possible movements: up, down, left, right
        movements = {"up": (-1, 0), "down": (1, 0), "left": (0, -1), "right": (0, 1)}
        
        # Define a dictionary to store valid movements and the corresponding 
        # indices of swaps
        self.valid_movements = {}
        
        # Get valid movements
        for movement in movements:
            check_valid_movement = self.add_coordinates(self.empty_tile_coords, movements[movement])
            x_moved, y_moved = check_valid_movement
            # new_coords.append(check_valid_movement)
            if (x_moved >= 0 and x_moved < 3) and (y_moved >= 0 and y_moved < 3):
                self.valid_movements[movement] = check_valid_movement
            
        return self.valid_movements     
    
    def get_empty_tile_coords(self):
        empty_tile = None
        for row in self.board_contents:
            for tile in row:
                if tile == "*":
                    empty_tile = (self.board_contents.index(row), row.index(tile))
        return empty_tile

starting_puzzle = PuzzleBoard(start)
goal_puzzle = PuzzleBoard(goal)
print(f"Start State: \n{starting_puzzle}")
print(f"Goal State: \n{goal_puzzle}")

Start State: 
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Goal State: 
| 1 | 2 | 3 |
| 8 | * | 4 |
| 7 | 6 | 5 |



## Node
This class represents the configuration of the puzzle as the A* algorithm searches for the optimal sequence leading to the goal state. It has the following attributes:

- Value: Contents of the 8-puzzle
- Parent node
- Cost functions `f(n)`, `g(n)`, and `h(n)`

Among the operators overridden in this class are the `__str__` method which returns the "prettified" visualization of the puzzle and the `__eq__` and `__hash__` for checking of equal board contents. Aside from this, it has the method `path()` which retrieves the sequence of puzzle configurations from the starting/initial state to the goal state.

In [74]:
class Node:
    def __init__(self, value, parent=None, g=0, h=0, p=0, s=0):
        self.value = value
        self.parent = parent

        # Cost functions: g(n), h(n), and f(n)
        self.g = g
        self.h = h
        self.f = g + h

        # Nilsson score functions: P(n) and S(n)
        # Only set if the heuristic to be used is the Nilsson score
        self.p = p
        self.s = s
    
    def __str__(self):
        return self.value.get_board_as_string()
    
    def __hash__(self):
        return hash(self.value)
    
    def __eq__(self, other):
        return isinstance(other, Node) and self.value == other.value
    
    def set_p(self, p):
        self.p = p

    def set_s(self, s):
        self.s = s

    def path(self):
        node, pth = self, []
        while node:
            pth.append(node)
            node = node.parent
        return pth[::-1]

## HeuristicFunction
This object is used to encapsulate the three heuristic functions used for the computation of the h(n) component of the total path cost function f(n). Among the heuristic functions used for the A* algorithm are:

1. Number of tiles in the wrong position
2. Manhattan distance of each tile to the correct position
3. Nilsson sequence score

In [75]:
class HeuristicFunction:

    def __init__(self, current_board, goal_board, heuristic):
        self.value = None
        self.seq_score = None
        self.manhattan_score = None
        if heuristic == "number_of_tiles_in_wrong_position":
            self.value = self.number_of_tiles_in_wrong_position(current_board, goal_board)
        elif heuristic == "manhattan_distance":
            self.value = self.manhattan_distance(current_board, goal_board)
        elif heuristic == "nilsson_sequence_score":
            self.value, self.manhattan_score, self.seq_score = self.nilsson_sequence_score(current_board, goal_board)
        else:
            raise ValueError("Invalid or unsupported heuristic function! Valid functions are number_of_tiles_in_wrong_position, manhattan_distance, and nilsson_sequence_score")

    def number_of_tiles_in_wrong_position(self, current_board, goal_board):
        wrong_counter = 0
        
        for i in range(len(current_board)):
            for j in range(len(current_board[i])):
                current_tile = current_board[i][j]
                if current_tile == "*":
                    continue
                if current_tile != goal_board[i][j]:
                    wrong_counter += 1
        return wrong_counter

    def manhattan_distance(self, current_board, goal_board):
        current_board = list(list(row) for row in current_board)
        goal_board = list(list(row) for row in goal_board)
        
        distance = 0
        # Flatten the boards into 1D lists
        if isinstance(current_board[0], list):
            current_board = [tile for row in current_board for tile in row]
        if isinstance(goal_board[0], list):
            goal_board = [tile for row in goal_board for tile in row]
            
        for i, tile in enumerate(current_board):
            if tile != goal_board[i] and tile != "*":  # Exclude the blank space represented by *
                # Find the index of the current tile in the goal state
                goal_index = goal_board.index(tile)
                # Convert the current index and goal index into (row, col) format
                current_pos = divmod(i, 3)
                goal_pos = divmod(goal_index, 3)
                # Calculate the Manhattan distance and add it to the total
                distance += abs(current_pos[0] - goal_pos[0]) + abs(current_pos[1] - goal_pos[1])
        return distance

    def nilsson_sequence_score(self, current_board, goal_board):

        current_board = list(list(row) for row in current_board)
        goal_board = list(list(row) for row in goal_board)

        manhattan_dist = self.manhattan_distance(current_board, goal_board)

        # Flatten the boards if they are 2D lists for easier index computation
        if isinstance(current_board[0], list):
            current_board = [tile for row in current_board for tile in row]
        if isinstance(goal_board[0], list):
            goal_board = [tile for row in goal_board for tile in row]
        
        # The correct clockwise ordering of the tiles around the periphery
        goal_order = goal_board[0:3] + [goal_board[5], goal_board[8], goal_board[7], goal_board[6], goal_board[3]]
        current_order = current_board[0:3] + [current_board[5], current_board[8], current_board[7], current_board[6], current_board[3]]

        # Calculate sequence score
        goal_pairs = []
        current_pairs = []
        sequence_score = 0
        for i in range(-1, -len(goal_order), -1):
            current_pair = (current_order[i], current_order[i+1])
            goal_pair = (goal_order[i], goal_order[i+1])
            goal_pairs.append(goal_pair)
            if current_pair[0] == "*":
                continue
            current_pairs.append(current_pair)

        for pair in current_pairs:
            if pair not in goal_pairs:
                sequence_score += 2

        # Check the center square, add 1 if it is not the empty square
        if current_board[4] != "*":
            sequence_score += 1
        
        # Multiply the sequence score by 3 and add the Manhattan distance
        return 3 * sequence_score + manhattan_dist, manhattan_dist, sequence_score
    
    def get_heuristic_value(self):
        return self.value
    
    def get_sequence_score(self):
        return self.seq_score
    
    def get_manhattan_score(self):
        return self.manhattan_score

## A* Algorithm
The implementation of the A* algorithm below follows the pseudocode from Slide 21 of Lecture 3B. It follows all 8 steps in the pseudocode but with an additional code for ending the search for a maximum number of iterations when the algorithm does not converge to the goal state. The default heuristic function is the number of tiles in the wrong position. The heuristic parameter must be explicitly set if another heuristic function is desired for the search.

In [76]:
def a_star(start, goal, heuristic="number_of_tiles_in_wrong_position"):
    
    print("Running A* Algorithm")
    print(f"Start Node: \n{start}")
    print(f"Goal Node: \n{goal}")
    
    # Variables to be used throughout the algorithm
    open = set()
    closed = set()
    path_to_goal = None # Will be assigned a list of Nodes if the goal state is found
    ctr = 0 # Counter to break non-converging search
    
    # Cost variables
    search_cost = 0
    heuristic_function = HeuristicFunction(start.board_contents, 
                                                        goal.board_contents, heuristic)
    
    h = heuristic_function.get_heuristic_value()
    if heuristic == "nilsson_sequence_score":
        p = heuristic_function.get_manhattan_score()
        s = heuristic_function.get_sequence_score()
        start_node = Node(start, h=h, p=p, s=s)
    else:
        start_node = Node(start, h=h)
    goal_node = Node(goal)
        
    # Step 1:
    # Push starting node to open
    open.add(start_node)
    current_node = start_node
    
    while ctr < MAX_ALGORITHM_ITERATIONS:
                    
        # Step 2: 
        # If open is empty, return failure and end search
        # Else, continue search
        if len(open) == 0:
            # print("No solution found.")
            break
        
        # Step 3:
        # Remove from OPEN the node whose f value is smallest and put it on a list called closed. 
        # Save this node as the value of current_node
        node_lowest_f = min(open, key=lambda node: node.f)
        current_node = node_lowest_f
        open.remove(node_lowest_f)
        closed.add(node_lowest_f)
        # print(f"Current node: \n{current_node} \
        #             \nf(n) = {current_node.f} \
        #             \ng(n) = {current_node.g} \
        #             \nh(n) = {current_node.h}\n")
        
        # Step 4:
        # If current node is the goal node, get the path from the start node to
        # the goal node and end the search
        if (current_node == goal_node):
            print(f"Goal achieved!")
            print(f"Algorithm took {ctr+1} iterations!")
            path_to_goal = current_node.path()
            break
        
        # Step 5:
        # Expand current node to get its successor. Each successor node has its f(n)
        # automatically computed and assigned upon instantiation
        successor_nodes = get_successors(current_node, goal_node, heuristic)
        search_cost += len(successor_nodes)
        if len(successor_nodes) == 0:
            continue
        
        # Step 6 and 7: 
        # If successor node is not yet in open/closed, immediately put the node in open
        # Else, compare if successor node or the current node has the lower f(n)
        # print(f"Opened Nodes:")
        successor_count = 1
        for successor_node in successor_nodes:
            # print(f"Successor node {successor_count}: \n{successor_node} \
            #         \nf(n) = {successor_node.f} \
            #         \ng(n) = {successor_node.g} \
            #         \nh(n) = {successor_node.h}\n")
            if (successor_node not in open) and (successor_node not in closed):
                open.add(successor_node)
            elif successor_node in open:
                # print(f"Successor node is already in open!")
                temp_open = list(open)
                for opened_node in temp_open:
                    if opened_node == successor_node:
                        previous_f = opened_node.f
                        if previous_f < successor_node.f:
                            successor_node.f = previous_f
            else:
                # print(f"Successor node is already in closed!")
                temp_closed = list(closed)
                for closed_node in temp_closed:
                    if closed_node == successor_node:
                        previous_f = closed_node.f
                        if previous_f < successor_node.f:
                            # print(f"parent node = {successor_node.parent}")
                            if not closed_node.parent:
                                successor_node.f = previous_f
                                successor_node.parent = current_node
                            else:
                                successor_node = closed_node
                            closed.remove(successor_node)
                            open.add(closed_node)
                        break
            successor_count += 1
     
        ctr += 1

    # Print necessary cost information
    print(f"Search cost in terms of generated nodes = {search_cost}")

    return path_to_goal
    
def get_successors(current, goal, heuristic="number_of_tiles_in_wrong_position"):
    
    successors = []
    current_board = current.value
    goal_board = goal.value
            
    # Move the tile according to the specified direction
    for movement in current_board.valid_movements:
        successor = current_board.swap(current_board.empty_tile_coords, current_board.valid_movements[movement])
        successor_puzzle = PuzzleBoard(successor)
        heuristic_function = HeuristicFunction(successor_puzzle.board_contents, 
                                               goal_board.board_contents, 
                                               heuristic)
        
        if heuristic == "nilsson_sequence_score":
            successor_node = Node(successor_puzzle,
                                parent = current,
                                g = current.g+1, 
                                h = heuristic_function.get_heuristic_value(),
                                p = heuristic_function.get_manhattan_score(),
                                s = heuristic_function.get_sequence_score())
        else:
            successor_node = Node(successor_puzzle,
                                parent = current,
                                g = current.g+1, 
                                h = heuristic_function.get_heuristic_value())
        successors.append(successor_node)
        
    return successors

## Test Runs

### Heuristic Function: Number of Tiles in Wrong Position

In [77]:
path_wrong_tiles = a_star(starting_puzzle, goal_puzzle, "number_of_tiles_in_wrong_position")
step_count = 0
if path_wrong_tiles:
    print(f"\nPath from start to goal:")
    for node in path_wrong_tiles:
        print(f"Step {step_count}:\
                    \nf(n) = {node.f}, h(n) = {node.h}, g(n) = {node.g}")
        print(node)
        step_count += 1
else:
    print("No solution found.")

Running A* Algorithm
Start Node: 
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Goal Node: 
| 1 | 2 | 3 |
| 8 | * | 4 |
| 7 | 6 | 5 |

Goal achieved!
Algorithm took 17457 iterations!
Search cost in terms of generated nodes = 51451

Path from start to goal:
Step 0:                    
f(n) = 7, h(n) = 7, g(n) = 0
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Step 1:                    
f(n) = 8, h(n) = 7, g(n) = 1
| 2 | 1 | 6 |
| 4 | 8 | * |
| 7 | 5 | 3 |

Step 2:                    
f(n) = 9, h(n) = 7, g(n) = 2
| 2 | 1 | * |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 3:                    
f(n) = 10, h(n) = 7, g(n) = 3
| 2 | * | 1 |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 4:                    
f(n) = 11, h(n) = 7, g(n) = 4
| 2 | 8 | 1 |
| 4 | * | 6 |
| 7 | 5 | 3 |

Step 5:                    
f(n) = 12, h(n) = 7, g(n) = 5
| 2 | 8 | 1 |
| 4 | 6 | * |
| 7 | 5 | 3 |

Step 6:                    
f(n) = 13, h(n) = 7, g(n) = 6
| 2 | 8 | 1 |
| 4 | 6 | 3 |
| 7 | 5 | * |

Step 7:                    
f(n) = 13, h(n

### Heuristic Function: Manhattan Distance

In [78]:
path_manhattan = a_star(starting_puzzle, goal_puzzle, "manhattan_distance")
step_count = 0
if path_manhattan:
    print(f"\nPath from start to goal:")
    for node in path_manhattan:
        print(f"Step {step_count}:\
                    \nf(n) = {node.f}, h(n) = {node.h}, g(n) = {node.g}")
        print(node)
        step_count += 1
else:
    print("No solution found.")

Running A* Algorithm
Start Node: 
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Goal Node: 
| 1 | 2 | 3 |
| 8 | * | 4 |
| 7 | 6 | 5 |

Goal achieved!
Algorithm took 1158 iterations!
Search cost in terms of generated nodes = 3445

Path from start to goal:
Step 0:                    
f(n) = 12, h(n) = 12, g(n) = 0
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Step 1:                    
f(n) = 12, h(n) = 11, g(n) = 1
| 2 | 1 | 6 |
| 4 | 8 | * |
| 7 | 5 | 3 |

Step 2:                    
f(n) = 12, h(n) = 10, g(n) = 2
| 2 | 1 | * |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 3:                    
f(n) = 14, h(n) = 11, g(n) = 3
| 2 | * | 1 |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 4:                    
f(n) = 16, h(n) = 12, g(n) = 4
| 2 | 8 | 1 |
| 4 | * | 6 |
| 7 | 5 | 3 |

Step 5:                    
f(n) = 16, h(n) = 11, g(n) = 5
| 2 | 8 | 1 |
| 4 | 6 | * |
| 7 | 5 | 3 |

Step 6:                    
f(n) = 16, h(n) = 10, g(n) = 6
| 2 | 8 | 1 |
| 4 | 6 | 3 |
| 7 | 5 | * |

Step 7:                    
f(n) =

### Heuristic Function: Nilsson Sequence Score

In [79]:
path_nilsson = a_star(starting_puzzle, goal_puzzle, "nilsson_sequence_score")
step_count = 0
if path_manhattan:
    print(f"\nPath from start to goal:")
    for node in path_nilsson:
        print(f"Step {step_count}:\
                    \nf(n) = {node.f}, h(n) = {node.h}, g(n) = {node.g}, P(n) = {node.p}, S(n) = {node.s}")
        print(node)
        step_count += 1
else:
    print("No solution found.")

Running A* Algorithm
Start Node: 
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Goal Node: 
| 1 | 2 | 3 |
| 8 | * | 4 |
| 7 | 6 | 5 |

Goal achieved!
Algorithm took 62 iterations!
Search cost in terms of generated nodes = 178

Path from start to goal:
Step 0:                    
f(n) = 54, h(n) = 54, g(n) = 0, P(n) = 12, S(n) = 14
| 2 | 1 | 6 |
| 4 | * | 8 |
| 7 | 5 | 3 |

Step 1:                    
f(n) = 51, h(n) = 50, g(n) = 1, P(n) = 11, S(n) = 13
| 2 | 1 | 6 |
| 4 | 8 | * |
| 7 | 5 | 3 |

Step 2:                    
f(n) = 51, h(n) = 49, g(n) = 2, P(n) = 10, S(n) = 13
| 2 | 1 | * |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 3:                    
f(n) = 53, h(n) = 50, g(n) = 3, P(n) = 11, S(n) = 13
| 2 | * | 1 |
| 4 | 8 | 6 |
| 7 | 5 | 3 |

Step 4:                    
f(n) = 52, h(n) = 48, g(n) = 4, P(n) = 12, S(n) = 12
| 2 | 8 | 1 |
| 4 | * | 6 |
| 7 | 5 | 3 |

Step 5:                    
f(n) = 49, h(n) = 44, g(n) = 5, P(n) = 11, S(n) = 11
| 2 | 8 | 1 |
| 4 | 6 | * |
| 7 | 5 | 3 |

Step 6:

## Sanity Check:
All test runs for the three different heuristic functions must have the same path costs. Note that there can be different paths from the starting puzzle to the goal puzzle for the same minimum path cost.

In [80]:
print(f"Path cost for number of wrong tiles h(n): {len(path_wrong_tiles)}")
print(f"Path cost for Manhattan distance h(n): {len(path_manhattan)}")
print(f"Path cost for Nilsson sequence score h(n): {len(path_nilsson)}")

Path cost for number of wrong tiles h(n): 19
Path cost for Manhattan distance h(n): 19
Path cost for Nilsson sequence score h(n): 19
