### IFN680 Sokoban Assignment


The functions and classes defined in this module will be called by a marker script. 
You should complete the functions and classes according to their specified interfaces.

You are not allowed to change the defined interfaces.
That is, changing the formal parameters of a function will break the 
interface and triggers to a fail for the test of your code.
 

In [1]:
# Imports

import search
import sokoban
import copy
import math

# !unzip warehouses.zip

%run sokoban.py

In [4]:
def my_team():
    '''
    Return the list of the team members of this assignment submission as a list
    of triplet of the form (student_number, first_name, last_name)
    '''
    return [ (11351519, 'William', 'Hung'), (11371188, 'Komei', 'Soda') ]

In [12]:
def taboo_cells(warehouse):
    '''  
    Identify the taboo cells of a warehouse. A cell inside a warehouse is 
    called 'taboo' if whenever a box get pushed on such a cell then the puzzle 
    becomes unsolvable.  
    When determining the taboo cells, you must ignore all the existing boxes, 
    simply consider the walls and the target cells.  
    Use only the following two rules to determine the taboo cells;
     Rule 1: if a cell is a corner inside the warehouse and not a target, 
             then it is a taboo cell.
     Rule 2: all the cells between two corners inside the warehouse along a 
             wall are taboo if none of these cells is a target.
    
    @param warehouse: a Warehouse object

    @return
       A string representing the puzzle with only the wall cells marked with 
       an '#' and the taboo cells marked with an 'X'.  
       The returned string should NOT have marks for the worker, the targets,
       and the boxes.  
    '''
    
    # set constants
    wall_cell = '#'
    target_cell = '.'
    goal_cell = '*'
    space_cell = ' '
    worker_cell = '!'
    taboo_cell = 'X'
    
    warehouse = [list(row) for row in str(warehouse).split('\n')]
 
    rows, cols = len(warehouse), len(warehouse[0])
    
    # set the cells in the warehouse
    # wall cells -> '#'
    walls = {(r, c) for r in range(rows) for c in range(cols) if warehouse[r][c] == wall_cell}
    # targets are cells that should be filled with boxes -> '.'
    targets = {(r, c) for r in range(rows) for c in range(cols) if warehouse[r][c] == target_cell}
    # goals are cells that already filled with boxes -> '*'
    goals = {(r, c) for r in range(rows) for c in range(cols) if warehouse[r][c] == goal_cell}
    
    #Identify outside coordinate
    outside_box = []
    walls = sorted(list(walls), key=lambda x: (x[0], x[1]))
     
    for row in range(rows):
        row_values = [col for col in walls if col[0] == row]
        if row_values:
            min_col = min(row_values, key=lambda x: x[1])[1]
            max_col = max(row_values, key=lambda x: x[1])[1]
        
        for col in range(cols):
            if col < min_col or max_col < col :
                outside_box.append((row,col))
       
    def is_inside_cell(cell):
        '''
        Checking if the cell is inside the warehouse
        If cell is inside then it must have walls on the same col both sides
        and walls on the same row both sides
        '''
        r, c = cell
        has_left_wall = any(cell == wall_cell for cell in warehouse[r][:c])
        has_right_wall = any(cell == wall_cell for cell in warehouse[r][c+1:])
        has_top_wall = any(row[c] == wall_cell for row in warehouse[:r])
        has_bot_wall = any(row[c] == wall_cell for row in warehouse[r+1:])
        
        return has_left_wall and has_right_wall and has_top_wall and has_bot_wall
    
    # Rule 1: corner cells
    def is_corner_cell(cell):
        '''
        cells that are in the corner:
        has at least one wall on the right/left
        and at least one wall above/below
        '''
        
        # assign row and column
        r, c = cell
        
        # set the var for number of walls count
        num_td_walls = 0
        num_lr_walls = 0
        
        # count num of top/down wall
        if warehouse[r][c] == space_cell:  
            if (warehouse[r+1][c] == wall_cell) or (warehouse[r-1][c] == wall_cell):
                num_td_walls += 1
            # count num of right/left wall
            if (warehouse[r][c+1] == wall_cell) or (warehouse[r][c-1] == wall_cell):
                num_lr_walls += 1       
        
        return (num_td_walls >= 1) and (num_lr_walls >= 1) and (warehouse[r][c] != target_cell) and (warehouse[r][c] not in outside_box) and is_inside_cell(cell)
        
    taboo_cells_list = [(r, c) for r in range(1, rows - 1) for c in range(1, cols - 1) if is_corner_cell((r, c)) and (r, c) not in outside_box and is_inside_cell((r, c))]
   
    temp = copy.deepcopy(taboo_cells_list)
    
    # Rule 2: cell between corners
    # mark cells between two corners along a wall as taboo
    for cell_1 in range(len(temp)-1):
        for cell_2 in range(cell_1+1,len(temp)):
            r1, c1 = temp[cell_1]
            r2, c2 = temp[cell_2]
            
            not_wall_count = 0
            not_target_or_wall = True
        
            if r1 == r2: # cells in the same row
                
                #check if there is target or walls between two corners
                for c in range(min(c1, c2) + 1, max(c1, c2)):
                    if (warehouse[r1][c] == target_cell) or (warehouse[r1][c] == goal_cell) or (warehouse[r1][c] == worker_cell) or (warehouse[r1][c] == wall_cell) :
                        not_target_or_wall = False
                
                if not_target_or_wall :
                    #check wall above
                    if warehouse[r1-1][c1] == wall_cell:
                        for c in range(min(c1, c2) + 1, max(c1, c2)):
                            if warehouse[r1-1][c] != wall_cell:
                                not_wall_count+= 1
                    
                     #check wall below
                    elif warehouse[r1+1][c1] == wall_cell:
                        for c in range(min(c1, c2) + 1, max(c1, c2)):
                            if warehouse[r1+1][c] != wall_cell:
                                not_wall_count+= 1
                    
                    #append coordinates of 'X'
                    if not_wall_count == 0 : 
                        for c in range(min(c1, c2) + 1, max(c1, c2)):
                            taboo_cells_list.append((r1, c))
            
            if c1 == c2: # cells in the same row
                
                #check if there is target or walls between two corners
                for r in range(min(r1, r2) + 1, max(r1, r2)):
                    if (warehouse[r][c1] == target_cell) or (warehouse[r][c1] == goal_cell) or (warehouse[r][c1] == worker_cell) or (warehouse[r][c1] == wall_cell) :
                        not_target_or_wall = False
                
                if not_target_or_wall :
                    #check wall left
                    if warehouse[r1][c1-1] == wall_cell:
                        for r in range(min(r1, r2) + 1, max(r1, r2)):
                            if warehouse[r][c1-1] != wall_cell:
                                not_wall_count += 1
                    
                    #check wall right
                    elif warehouse[r1][c1+1] == wall_cell:
                         for r in range(min(r1, r2) + 1, max(r1, r2)):
                            if warehouse[r][c1+1] != wall_cell:
                                not_wall_count += 1
                    
                    #append coordinates of 'X'
                    if not_wall_count == 0 : 
                        for r in range(min(r1, r2) + 1, max(r1, r2)):  
                            taboo_cells_list.append((r, c1))   
                                
    
    for row in range(len(warehouse)):
        for col in range(len(warehouse[0])):
            if (row,col) in taboo_cells_list:
                warehouse[row][col] = taboo_cell
            if (warehouse[row][col] != taboo_cell) and (warehouse[row][col] != wall_cell):
                warehouse[row][col] = space_cell

    return '\n'.join([''.join(row) for row in warehouse])

In [None]:
def direction_to_action(direction):
    '''
    change the string direction to action
    from 'Right' -> (1, 0)
    '''
    if direction == 'Left':
        return (-1, 0)
    elif direction == 'Right':
        return (1, 0)
    elif direction == 'Up':
        return (0, -1)
    elif direction == 'Down':
        return (0, 1)
    else:
        raise ValueError(f'Invalid Direction: {direction}')

In [None]:
def manhattan_distance(p1, p2):
    """
    Calculate the Manhattan distance between two points.
    """
    return abs(p1[0]-p2[0]) + abs(p1[1]-p2[1])

In [None]:
def euclidean_distance(p1, p2):
    """
    Calculate the Euclidean distance between two points.
    """
    return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)

In [13]:
class SokobanPuzzle(search.Problem):
    '''
    An instance of the class 'SokobanPuzzle' represents a Sokoban puzzle.
    An instance contains information about the walls, the targets, the boxes
    and the worker.

    Your implementation should be fully compatible with the search functions of 
    the provided module 'search.py'. 
    
    Each instance should have at least the following attributes
    - self.allow_taboo_push
    - self.macro
    
    When self.allow_taboo_push is set to True, the 'actions' function should 
    return all possible legal moves including those that move a box on a taboo 
    cell. If self.allow_taboo_push is set to False, those moves should not be
    included in the returned list of actions.
    
    If self.macro is set True, the 'actions' function should return 
    macro actions. If self.macro is set False, the 'actions' function should 
    return elementary actions.
    
    Revisit the sliding puzzle and the pancake puzzle for inspiration!
    
    Note that you will need to add several functions to 
    complete this class. For example, a 'result' function is needed
    to satisfy the interface of 'search.Problem'.
    
    '''

    def __init__(self, warehouse, goal=None, allow_taboo_push=False, macro=False):
        self.warehouse = warehouse
        self.walls = tuple(warehouse.walls)
        self.worker = tuple(warehouse.worker)
        self.boxes = tuple(warehouse.boxes)
        self.targets = warehouse.targets
        self.allow_taboo_push = allow_taboo_push
        self.macro = macro
        self.initial = (tuple(warehouse.worker),) + tuple(warehouse.boxes)
        self.taboo_cell_list = set(find_2D_iterator(taboo_cells(warehouse).split("\n"), "X"))
        
    def actions(self, state):
        """
        Return the list of actions that can be executed in the given state.
        
        As specified in the header comment of this class, the attributes
        'self.allow_taboo_push' and 'self.macro' should be tested to determine
        what type of list of actions is to be returned.
        """
        # split the state into worker and boxes
        worker = tuple(state[0])
        boxes = tuple(state[1:])
        
        ele_actions = []
        macro_actions = []
        
        if self.macro == False:
            '''
            return elementary actions, 
            for example: ['Left', 'Down', 'Down']
            ''' 
            for dx, dy, action_str in [(-1, 0, 'Left'), (1, 0, 'Right'), (0, -1, 'Up'), (0, 1, 'Down')]:
                new_worker = (worker[0] + dx, worker[1] + dy)

                # new_worker position is not the wall
                if new_worker not in self.walls:

                    # if worker pushes a box
                    if new_worker in boxes:

                        # set the new_box position
                        new_box = (new_worker[0] + dx, new_worker[1] + dy)
                        # print("new box", new_box)

                        # if new_box position is not wall/box
                        if new_box not in self.walls and new_box not in boxes:
                            # print("Box is moved")
                            
                            # if allow taboo push, append the action
                            if self.allow_taboo_push == True:
                                ele_actions.append(action_str)
                                
                            elif (self.allow_taboo_push == False) and (new_box not in self.taboo_cell_list):
                                ele_actions.append(action_str)

                    # if worker is moving without pushing a box
                    else:
                        ele_actions.append(action_str)
            
            # print(f'ele_action list: {ele_actions}')
            return ele_actions
            
        else:
            '''
            return a list of macro actions
            For example M = [ ((3,4),'Left') , ((5,2),'Up'), ((12,4),'Down') ]
            only append a action that can push a box
            '''
            self.warehouse.worker = worker
            self.warehouse.boxes = boxes
            
            # search for pushable position and move the worker to the position and add it to the macro_actions
            for box in boxes:
                # this seq is the pos of the box: dx->box's x-axis, dy->box's y-axis, push_direction->direction where the worker push the box
                for dx, dy, push_direction in [(-1, 0, 'Right'), (1, 0, 'Left'), (0, -1, 'Down'), (0, 1, 'Up')]:
                    new_worker = (box[0] + dx, box[1] + dy)
                    pushed_box = (box[0] + dx*(-1), box[1] + dy*(-1))
                    
                    if new_worker not in boxes and new_worker not in self.walls:
                    
                        if can_go_there(self.warehouse, (box[1] + dy, box[0] + dx)):
                    
                            if pushed_box not in boxes and pushed_box not in self.walls:

                                    if self.allow_taboo_push == True:
                                        macro_actions.append((new_worker, push_direction))

                                    elif (self.allow_taboo_push == False) and (pushed_box not in self.taboo_cell_list):
                                        macro_actions.append((new_worker, push_direction))      

            return macro_actions
    
    
    def result(self, state, action):
        """
        Return the successor of the given state 
        after executing the given action.
        """

        worker = list(state[0])
        boxes = list(state[1:])
        
        # return elementary actions
        if self.macro == False and action is not None:
            dx, dy = direction_to_action(action)
            new_worker = (worker[0] + dx, worker[1] + dy)
            # print("new_worker", new_worker)
            
            for idx, box in enumerate(boxes):
                if new_worker == box:
                    boxes[idx] = (box[0] + dx, box[1] + dy)

        elif self.macro == True and action is not None:
            (pos_x, pos_y), direction = action
            dx, dy = direction_to_action(direction)
            
            for idx, box in enumerate(boxes):
                # print(f'---For loop for {box}---')
                
                # set new_worker to new position(pos_x, pos_y)
                new_worker = (pos_x + dx, pos_y + dy)
                new_box = (box[0] + dx, box[1] + dy)

                # if worker is pushing a box
                if new_worker == box:
                    new_worker = new_worker
                    # print(f'box move from {box}->{new_box} ')
                    boxes[idx] = new_box

                else:
                    # print('This is not the box being pushed...')
                    # print(f'new_worker == boxes: {new_worker == box}')
                    pass
                
        else:
            # raise error if something is wrong
            raise ValueError('There is nothing in action')
        
        # set the state to current state
        boxes = tuple(boxes)
        current_state = (tuple(new_worker),) + boxes
        self.warehouse.worker = new_worker
        self.warehouse.boxes = boxes

        # print("current_state", current_state)
        return current_state
        
    
    def goal_test(self, state):
        """
        Return True if the state is a goal state; False otherwise.
        """
        # check every box if boxes is in goal
        for box in state[1:]:
            if box not in self.targets:
                return False
            
        return True
        
            
    def h(self, n):
        """
        Return the heuristic value for a given node.
        """
        worker, boxes = n.state[0], n.state[1:]
        boxes = list(boxes)
        dst = []
        h_value = 0
        
        for box in boxes:
            for target in self.targets:
                dst.append(manhattan_distance(box, target))
                h_value += min(dst) * 0.8

        return h_value
    
    
    # Each step cost 1
    def value(self, state):
        return 1

In [14]:
def check_action_seq(warehouse, action_seq):
    '''
    
    Determine if the sequence of actions listed in 'action_seq' is legal or not.
    Checking both macro_actions and ele_actions sequences
    
    Important notes:
      - a legal sequence of actions does not necessarily solve the puzzle.
      - an action is legal even if it pushes a box onto a taboo cell.
        
    @param warehouse: a valid Warehouse object

    @param action_seq: a sequence of legal actions.
           For example, ['Left', 'Down', Down','Right', 'Up', 'Down']
           
    @return
        The string 'Failure', if one of the action was not successul.
           For example, if the agent tries to push two boxes at the same time,
                        or push one box into a wall.
        Otherwise, if all actions were successful, return                 
               A string representing the state of the puzzle after applying
               the sequence of actions.  This must be the same string as the
               string returned by the method  Warehouse.__str__()
    '''
    
    # Extract data from warehouse and swap row and col
    boxes = [list((x,y)) for x, y in warehouse.boxes]
    worker = list(warehouse.worker)
    walls = [list((x, y)) for x, y in warehouse.walls]
    
    FAIL_SEQ = 'Failure'

    # return 'Failure' when solving is impossible
    if action_seq[0] == 'Impossible':
        return FAIL_SEQ
    
    for direction in action_seq:
        # check if the move is available
        # if not -> 'Failure'
        dx, dy = direction_to_action(direction)

        #update workers position
        worker[0] += dx
        worker[1] += dy
        #print("worker", worker)

        # check if the worker is in a wall
        if worker in walls:
            # print('FAIL!!! worker in wall')
            return FAIL_SEQ

        #check boxes
        for box_index, box in enumerate(boxes):
            # when worker pushes a box
            if worker == box :
                # print('Worker pushes a box')
                box[0] = box[0] + dx
                box[1] = box[1] + dy

                # check if the box is in a wall
                if box in walls:
                    # print('FAIL!!! box in wall')
                    return FAIL_SEQ

                # check if worker is pushing two boxes
                other_boxes = [other_box for i, other_box in enumerate(boxes) if i != box_index]

                # check if box is in other boxes
                if box in other_boxes:
                    # print('FAIL!!! box in box')
                    return FAIL_SEQ
    
    warehouse.boxes = tuple(tuple(box) for box in boxes)
    warehouse.worker = tuple(worker)
    
    return warehouse.__str__()

In [15]:
def solve_sokoban_elem(warehouse):
    '''    
    This function should solve using elementary actions 
    the puzzle defined in a file.
    
    @param warehouse: a valid Warehouse object

    @return
        If puzzle cannot be solved return the string 'Impossible'
        If a solution was found, return a list of elementary actions that solves
            the given puzzle coded with 'Left', 'Right', 'Up', 'Down'
            For example, ['Left', 'Down', Down','Right', 'Up', 'Down']
            If the puzzle is already in a goal state, simply return []
    '''

    def find_path(goal_node):
        '''
        Helper function to find the list of action from node list if the possible solution is
        found to fix the Sokoban Puzzle

        @param goal_node: the list of nodes has reached the goal test

        @return
                the list of actions from every node 
        '''
        if goal_node == None:
            return ['Impossible']

        path = goal_node.path()
        step_move = []
        for step in path:
            if step.action is not None:
                step_move.append(step.action)
        return step_move
    
    wh_copy = warehouse.copy()
    
    # solve the puzzle with ele_action -> macro=False
    puzzle = SokobanPuzzle(wh_copy, macro=False)
    state = (warehouse.worker,) + tuple(warehouse.boxes)
    goal_state = puzzle.goal_test(state)
    
    # solution = find_path(search.breadth_first_graph_search(puzzle))
    # solution = find_path(search.depth_first_graph_search(puzzle))
    solution = find_path(search.astar_graph_search(puzzle, puzzle.h))
    # print(solution)
    
    if goal_state:
        return []
    
    elif (solution is None or check_action_seq(warehouse, solution) == "Failure"):
        return ['Impossible']
    
    else:    
        return solution

In [None]:
class FindPathProblem(search.Problem):
    '''
    Set another class for can_go_there as a copy of SokobanPuzzle
    But the boxes can not be pushed
    '''
    
    def __init__(self, initial, warehouse, goal):
        self.initial = initial
        self.goal = goal
        self.warehouse = warehouse
        
    def value(self, state):
        return 1
    
    def result(self, state, action):
        new_state = (state[0] + action[0], state[1] + action[1])   
        return new_state
    
    def actions(self, state):
        for action in ((0, 1), (0, -1), (1, 0), (-1, 0)):
            new_state = self.result(state, action)
            
            if new_state not in self.warehouse.boxes and new_state not in self.warehouse.walls:
                yield action

In [8]:
def can_go_there(warehouse, dst):
    '''    
    Determine whether the worker can walk to the cell dst=(row,column) 
    without pushing any box.
    
    @param warehouse: a valid Warehouse object

    @return
      True if the worker can walk to cell dst=(row,column) without pushing any box
      False otherwise
    '''

    # dst is given as (row, col), changed it to (x, y)
    dst = (dst[1], dst[0])
    
    # set the heuristic as the manhattan distance
    def h(n):
        state = n.state
        return manhattan_distance(state, dst)
    
    # using a* search as the search algorithm
    solution = search.astar_graph_search(FindPathProblem(warehouse.worker, warehouse, dst), h)
    
    # return True or False
    return solution is not None

In [9]:
def solve_sokoban_macro(warehouse):
    '''    
    Solve using macro actions the puzzle defined in the warehouse passed as
    a parameter. A sequence of macro actions should be 
    represented by a list M of the form
            [ ((r1,c1), a1), ((r2,c2), a2), ..., ((rn,cn), an) ]
    For example M = [ ((3,4),'Left') , ((5,2),'Up'), ((12,4),'Down') ] 
    means that the worker first goes the box at row 3 and column 4 and pushes it left,
    then goes to the box at row 5 and column 2 and pushes it up, and finally
    goes the box at row 12s and column 4 and pushes it down.
    
    @param warehouse: a valid Warehouse object

    @return
        If puzzle cannot be solved return the string 'Impossible'
        Otherwise return M a sequence of macro actions that solves the puzzle.
        If the puzzle is already in a goal state, simply return []
    '''
    
    def find_path(goal_node):
        '''
        Helper function to find the list of action from node list if the possible solution is
        found to fix the Sokoban Puzzle

        @param goal_node: the list of nodes has reached the goal test

        @return
                the list of actions from every node 
        '''
        if goal_node == None:
            return ['Impossible']

        path = goal_node.path()
        # print('path', path)
        step_move = []
        for step in path:
            if step.action is not None:
                step_move.append(step.action)
        return step_move
    
    # solve the puzzle with macro_action -> macro=True
    puzzle = SokobanPuzzle(warehouse, macro=True)
    state = (warehouse.worker,) + tuple(warehouse.boxes)
    goal_state = puzzle.goal_test(state)

    # solution = find_path(search.breadth_first_graph_search(puzzle))
    # solution = find_path(search.depth_first_graph_search(puzzle))
    solution = find_path(search.astar_graph_search(puzzle, puzzle.h))
    # print("solution", solution)

    if goal_state:
        return []
    
    elif solution == ['Impossible']:
        return ['Impossible']
    
    else:
        # the returned solution is in ((x, y), direction)
        # (x, y) is the position of the worker is about the push the box
        changed_solution = []
        
        for (pos_x, pos_y), direction in solution:
            dx, dy = direction_to_action(direction)
            pos_x += dx
            pos_y += dy
            changed_solution.append(((pos_y, pos_x), direction))
            
        return changed_solution

In [None]:
# import time

# %run sokoban.py
# # TEST
# start_time = time.time()
# solve_sokoban_macro(wh)
# end_time = time.time()
# print("Test for 'solve_sokoban_macro' with 'a* search'")
# print(f"Time: {end_time - start_time}s")

##########################################################

TESTING RESULT for `solve_sokoban_macro`  
  
<----------------------Warehouse_05---------------------->  
Test for `solve_sokoban_macro` with `BFS`  
Total Time: 5.216866970062256s  
Test for `solve_sokoban_macro` with `a* search`  
Total Time: 0.9392697811126709s  

<----------------------Warehouse_07---------------------->  
Test for `solve_sokoban_macro` with `BFS`  
Total Time: 57.92066931724548s  
Test for `solve_sokoban_macro` with `a* search`  
Total Time: 20.722663164138794s  

<----------------------Warehouse_103---------------------->  
Test for `solve_sokoban_macro` with 'BFS'  
Total Time: 22.497217655181885s  
Test for `solve_sokoban_macro` with `a* search`  
Total Time: 1.2030234336853027s  