# hw1

rename this file into `puzzle.py`

In [1]:
from __future__ import division
from __future__ import print_function

import sys
import math
import time
import queue as Q

import resource

In [32]:
#### SKELETON CODE ####
## The Class that Represents the Puzzle

class PuzzleState(object):
    """
        The PuzzleState stores a board configuration and implements
        movement instructions to generate valid children.
    """
    def __init__(self, config, n, parent=None, action="Initial", cost=0):
        """
        :param config->List : Represents the n*n board, for e.g. [0,1,2,3,4,5,6,7,8] represents the goal state.
        :param n->int : Size of the board
        :param parent->PuzzleState
        :param action->string
        :param cost->int
        """
        if n*n != len(config) or n < 2:
            raise Exception("The length of config is not correct!")
        if set(config) != set(range(n*n)):
            raise Exception("Config contains invalid/duplicate entries : ", config)

        self.n        = n
        self.cost     = cost
        self.parent   = parent
        self.action   = action
        self.config   = config
        self.children = []

        # Get the index and (row, col) of empty block
        self.blank_index = self.config.index(0)

    def __lt__(self, other):
        actions = {}
        actions["Up"] = 1
        actions["Down"] = 2
        actions["Left"] = 3
        actions["Right"] = 4
        return actions[self.action] < actions[other.action]

    def display(self):
        """ Display this Puzzle state as a n*n board """
        for i in range(self.n):
            print(self.config[3*i : 3*(i+1)])
    
    def move_up(self):
        """ 
        Moves the blank tile one row up.
        :return a PuzzleState with the new configuration
        """
        idx0 = self.config.index(0)
        if(idx0 < 3):
            return
        else:
            ps_conf = list(self.config)
            ps = PuzzleState(ps_conf, 3)
            ps.config[idx0] = self.config[idx0-3]
            ps.config[idx0-3] = 0
                
            ps.cost = self.cost+1
            ps.action = "Up"
            ps.parent = self
            self.children.append(ps)
            return ps
      
    def move_down(self):
        """
        Moves the blank tile one row down.
        :return a PuzzleState with the new configuration
        """
        idx0 = self.config.index(0)
        if(idx0 > 5):
            return
        else:
            ps_conf = list(self.config)
            ps = PuzzleState(ps_conf, 3)
            ps.config[idx0] = self.config[idx0+3]
            ps.config[idx0+3] = 0
            
            ps.cost = self.cost+1
            ps.action = "Down"
            ps.parent = self
            self.children.append(ps)
            return ps
      
    def move_left(self):
        """
        Moves the blank tile one column to the left.
        :return a PuzzleState with the new configuration
        """
        idx0 = self.config.index(0)
        if(idx0%3 == 0):
            return
        else:
            ps_conf = list(self.config)
            ps = PuzzleState(ps_conf, 3)
            ps.config[idx0] = self.config[idx0-1]
            ps.config[idx0-1] = 0
            
            ps.cost = self.cost+1
            ps.action = "Left"
            ps.parent = self
            self.children.append(ps)
            return ps

    def move_right(self):
        """
        Moves the blank tile one column to the right.
        :return a PuzzleState with the new configuration
        """
        idx0 = self.config.index(0)
        if(idx0%3 == 2):
            return
        else:
            ps_conf = list(self.config)
            ps = PuzzleState(ps_conf, 3)
            ps.config[idx0] = self.config[idx0+1]
            ps.config[idx0+1] = 0
                    
            ps.cost = self.cost+1
            ps.action = "Right"
            ps.parent = self
            self.children.append(ps)
            return ps
      
    def expand(self):
        """ Generate the child nodes of this node """
        
        # Node has already been expanded
        if len(self.children) != 0:
            return self.children
        
        # Add child nodes in order of UDLR
        children = [
            self.move_up(),
            self.move_down(),
            self.move_left(),
            self.move_right()]

        # Compose self.children of all non-None children states
        self.children = [state for state in children if state is not None]
        return self.children

In [3]:
def construct_path_str(path):
    path_str = "["
    for s in path:
        path_str += ("'" + s + "', ")
    path_str = path_str[:-2]
    path_str += "]"
    return path_str

In [4]:
def construct_path(final_state):
    path = []
    s = final_state
    while s is not None:
        path.append(s.action)
        s = s.parent
    
    path.remove("Initial")
    path.reverse()
    
    return path

In [5]:
# Function that Writes to output.txt
def writeOutput(path, nodes, max_depth, runtime, ram):
    path_str = construct_path_str(path)
    
    f = open("output.txt", "w")
    f.write("path_to_goal: " + path_str + "\n")
    f.write("cost_of_path: " + str(len(path)) + "\n")
    f.write("nodes_expanded: " + str(nodes) + "\n")
    f.write("search_depth: " + str(len(path)) + "\n")
    f.write("max_search_depth: " + str(max_depth) + "\n")
    f.write("running_time: %.8f\n" % runtime)
    f.write("max_ram_usage: %.8f" % ram)
    f.close()

In [6]:
def test_goal(puzzle_state):
    """test the state is the goal state or not"""
    goal_state = [0,1,2,3,4,5,6,7,8]
    return puzzle_state.config == goal_state

In [7]:
def calculate_manhattan_dist(idx, value, n):
    """calculate the manhattan distance of a tile"""
    if (value == 0):
        return 0
    x = idx//n
    y = idx%n
    x_goal = value//n
    y_goal = value%n
    return abs(x - x_goal) + abs(y - y_goal)

In [8]:
def calculate_total_cost(state):
    """calculate the total estimated cost of a state"""
    total_cost = state.cost # g(n)
    
    # adding h(n)
    for i in range(9):
        total_cost += calculate_manhattan_dist(i, state.config[i], 3)
    return total_cost

In [25]:
def bfs_search(initial_state):
    """BFS search"""
    frontier = Q.Queue()
    frontier.put(initial_state)
    explored = set()
    frontier_set = {tuple(initial_state.config)}

    nodes_expanded = -1
    max_depth = -1

    start_time  = time.time()
    start_ram = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
        
    while(not frontier.empty()):        
        s = frontier.get()
        s_tuple = tuple(s.config)
        explored.add(s_tuple)
        frontier_set.remove(s_tuple)

        nodes_expanded += 1

        if(test_goal(s)):
            end_time = time.time()
            ram = (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss-start_ram)/(2**20)
            writeOutput(construct_path(s), nodes_expanded, max_depth, end_time-start_time, ram)

        s_children = s.expand()
        if(s.cost + 1 > max_depth):
            max_depth = s.cost + 1 # this has to be here, after the node expansion
        
        for c in s_children:
            c_tuple = tuple(c.config)
            if (c_tuple not in frontier_set) and (c_tuple not in explored):
                frontier.put(c)
                frontier_set.add(c_tuple)

In [26]:
def dfs_search(initial_state):
    """DFS search"""
    frontier = []
    frontier.append(initial_state)
    explored = set()
    frontier_set = {tuple(initial_state.config)}

    nodes_expanded = -1
    max_depth = -1

    start_time  = time.time()
    start_ram = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

    while(len(frontier)!=0):
        s = frontier.pop()
        s_tuple = tuple(s.config)
        explored.add(s_tuple)
        frontier_set.remove(s_tuple)
        
        nodes_expanded += 1
        if(s.cost > max_depth):
            max_depth = s.cost

        if(test_goal(s)):
            end_time = time.time()
            ram = (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss-start_ram)/(2**20)
            writeOutput(construct_path(s), nodes_expanded, max_depth, end_time-start_time, ram)

        s_children = s.expand()
        s_children.reverse() # this is super important, otherwise it's wrong

        for c in s_children:
            c_tuple = tuple(c.config)
            if (c_tuple not in frontier_set) and (c_tuple not in explored):
                frontier.append(c)
                frontier_set.add(c_tuple)

In [27]:
def A_star_search(initial_state):
    """A * search"""
    frontier = Q.PriorityQueue()
    frontier.put((calculate_total_cost(initial_state), initial_state))
    explored = set()
    frontier_set = {tuple(initial_state.config)}
    
    nodes_expanded = -1
    max_depth = -1

    start_time  = time.time()
    start_ram = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    
    while(not frontier.empty()):        
        s = frontier.get()[1]
        s_tuple = tuple(s.config)

        if (s_tuple in explored):
            continue # avoid checking the same state twice because there might be duplicates when updating it in the priority queue
        
        explored.add(s_tuple)
        frontier_set.remove(s_tuple)

        nodes_expanded += 1
        if(s.cost > max_depth):
            max_depth = s.cost

        if(test_goal(s)):
            end_time = time.time()
            ram = (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss-start_ram)/(2**20)
            writeOutput(construct_path(s), nodes_expanded, max_depth, end_time-start_time, ram)

        s_children = s.expand()
        
        for c in s_children:
            c_tuple = tuple(c.config)
            if (c_tuple not in frontier_set) and (c_tuple not in explored):
                frontier.put((calculate_total_cost(c), c))
                frontier_set.add(c_tuple)
            elif (c_tuple in frontier_set):
                frontier.put((calculate_total_cost(c), c))

In [41]:
# Test main driver
def main():
    # sys.argv[2] = "1,2,5,3,4,0,6,7,8" # test case 1
    # sys.argv[2] = "6,1,8,4,0,2,7,3,5" # test case 2
    # sys.argv[2] = "8,6,4,2,1,3,5,7,0" # test case 3
    # sys.argv[2] = "1,2,0,3,4,5,6,7,8" # test case 4
    # sys.argv[2] = "3,1,2,6,4,5,7,8,0" # test case 5
    sys.argv[2] = "0,8,7,6,5,4,3,2,1" # test case 6
    begin_state = sys.argv[2].split(",")
    begin_state = list(map(int, begin_state))
    board_size  = int(math.sqrt(len(begin_state)))
    hard_state  = PuzzleState(begin_state, board_size)

    # bfs_search(hard_state)
    # dfs_search(hard_state)
    A_star_search(hard_state)

if __name__ == '__main__':
    main()

In [13]:
## Main Function that reads in Input and Runs corresponding Algorithm
def main():
    search_mode = sys.argv[1].lower()
    begin_state = sys.argv[2].split(",")
    begin_state = list(map(int, begin_state))
    board_size  = int(math.sqrt(len(begin_state)))
    hard_state  = PuzzleState(begin_state, board_size)
    start_time  = time.time()
    
    if   search_mode == "bfs": bfs_search(hard_state)
    elif search_mode == "dfs": dfs_search(hard_state)
    elif search_mode == "ast": A_star_search(hard_state)
    else: 
        print("Enter valid command arguments !")
        
    end_time = time.time()
    print("Program completed in %.3f second(s)"%(end_time-start_time))

if __name__ == '__main__':
    main()

ValueError: invalid literal for int() with base 10: '/Users/xintongqi/Library/Jupyter/runtime/kernel-19b9ce7b-da46-4364-a053-162aadfd33d4.json'