In [10]:
import numpy as np
from queue import Queue

from utils import Problem, Node, PriorityQueue

## modeling of problem

In [11]:
class RushHour(Problem):
    def __init__(self, initial, car_code, cars_list) -> None:
        self.initial = initial
        self.car_code = car_code
        self.cars_list = cars_list


    def goal_test(self, parking) -> bool:
        car_pos = np.argwhere(parking == self.car_code)
        row, col = tuple(car_pos[-1])
        if self.is_horizontal(parking, self.car_code):
            return all(parking[row, col+1:] == 0)
        if self.is_vertical(parking, self.car_code):
            return all(parking[row+1:, col] == 0)

    def actions(self, parking) -> list[tuple]:
        actions_list = []
        for car in self.cars_list:
            movements = self.allowed_movements(parking, car)
            for move in movements:
                actions_list.append((car, move))
        return actions_list

    def result(self, state, action) -> np.ndarray:
        p = state.copy()
        car_code, move = action
        self.car_move(p, car_code, move)
        return p


    def allowed_movements(self, parking, car_code) -> list:
        allowed_moves = []
        if self.is_horizontal(parking, car_code):
            pos = np.argwhere(parking == car_code)
            L_row, L_col = tuple(pos[0])
            R_row, R_col = tuple(pos[-1])
            if L_col > 0 and parking[L_row, L_col-1] == 0:
                allowed_moves.append("L")
            if R_col < parking.shape[1]-1 and parking[R_row, R_col+1] == 0:
                allowed_moves.append("R")

        if self.is_vertical(parking, car_code):
            pos = np.argwhere(parking == car_code)
            U_row, U_col = tuple(pos[0])
            D_row, D_col = tuple(pos[-1])
            if U_row > 0 and parking[U_row-1, U_col] == 0:
                allowed_moves.append("U")
            if D_row < parking.shape[0]-1 and parking[D_row+1, D_col] == 0:
                allowed_moves.append("D")

        return allowed_moves

    def car_move(self, parking, car_code, side: str):
        movements = {
            "U": self.move_up,
            "D": self.move_down,
            "L": self.move_left, 
            "R": self.move_right,
        }
        key = side.upper()
        if key in movements:
            movements[key](parking, car_code)

    def move_left(self, parking, car_code):
        pos = np.argwhere(parking == car_code)
        R_row, R_column = tuple(pos[-1])
        L_row, L_column = tuple(pos[0])
        parking[R_row, R_column] = 0
        parking[L_row, L_column-1] = car_code

    def move_right(self, parking, car_code):
        pos = np.argwhere(parking == car_code)
        R_row, R_column = tuple(pos[-1])
        L_row, L_column = tuple(pos[0])
        parking[R_row, R_column+1] = car_code
        parking[L_row, L_column] = 0

    def move_up(self, parking, car_code):
        pos = np.argwhere(parking == car_code)
        D_row, D_column = tuple(pos[-1])
        U_row, U_column = tuple(pos[0])
        parking[D_row, D_column] = 0
        parking[U_row-1, U_column] = car_code

    def move_down(self, parking, car_code):
        pos = np.argwhere(parking == car_code)
        D_row, D_column = tuple(pos[-1])
        U_row, U_column = tuple(pos[0])
        parking[D_row+1, D_column] = car_code
        parking[U_row, U_column] = 0

    def is_vertical(self, parking, car_code: int) -> bool:
        pos = np.argwhere(parking == car_code)
        if pos.size > 0:
            return np.all(pos[:, 1] == pos[0, 1]) # vertical
        return False

    def is_horizontal(self, parking, car_code: int) -> bool:
        pos = np.argwhere(parking == car_code)
        if pos.size > 0:
            return np.all(pos[:, 0] == pos[0, 0]) # horizontal
        return False

    def heuristic(self, parking) -> int:
        car_pos = np.argwhere(parking == self.car_code)
        row, col = tuple(car_pos[-1])
        if self.is_horizontal(parking, self.car_code):
            return sum(parking[row, col+1:] != 0)
        if self.is_vertical(parking, self.car_code):
            return sum(parking[row+1:, col] != 0)
        
    def h(self, node) -> int:
        return self.heuristic(node.state)

## search algorithms

In [12]:
def bfs(problem):
    initial = Node(problem.initial)
    frontiar = Queue()
    frontiar.put(initial)
    explored = set()

    while not frontiar.empty():
        current: Node = frontiar.get(timeout=0)
        explored.add(str(current.state))

        if problem.goal_test(current.state):
            return current
        
        for child in current.expand(problem):
            if str(child.state) not in explored:
                frontiar.put(child)
    return None

In [13]:
def aStar(problem: Problem, wight=1.0, display=False, graph_search=True) -> Node | None:
    h = problem.h 
    frontier = PriorityQueue()
    explored = set()
    node = Node(problem.initial)
    frontier.push(node, wight*h(node) + node.path_cost)
    c = 0
    while not frontier.isEmpty():
        node: Node = frontier.pop()
        if graph_search:
            explored.add(str(node.state))
        
        if problem.goal_test(node.state):
            if display:
                print("frontier count:", frontier.count)
                print("expanded nodes:", c)
                print(f"< goal state:\n{node.state} >")
            return node
        
        for child in node.expand(problem):
            if str(child.state) not in explored:
                frontier.push(child, wight*h(node) + node.path_cost)
        c += 1
    return None

## run of algorithms

In [14]:
## set initial parameters
parking = np.zeros((6, 6), dtype=int)
parking[2, 1:3] = 1
parking[0:2, 2] = 2
parking[1:4, 3] = 3
parking[1:3, 4] = 4
parking[0, 4:] = 5
parking[3, 4:] = 6
# parking[4, 3:] = 7

print(parking)
cars = [1, 2, 3, 4, 5, 6, 7]
problem1 = RushHour(parking, 1, cars)

[[0 0 2 0 5 5]
 [0 0 2 3 4 0]
 [0 1 1 3 4 0]
 [0 0 0 3 6 6]
 [0 0 0 0 0 0]
 [0 0 0 0 0 0]]


In [15]:
## set initial parameters
parking = np.zeros((6, 6), dtype=int)
parking[2, 0:2] = 1  # A
parking[0:2, 2] = 2  # I
parking[2:4, 2] = 3  # J
parking[3, 0:2] = 4  # C
parking[0, 3:] = 5   # B
parking[3, 3:5] = 6  # D
parking[2:5, 5] = 7  # L
parking[1:3, 3] = 8  # K
parking[4:, 0] = 9   # G
parking[4:, 1] = 10  # H

print(parking)
cars = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
problem2 = RushHour(parking, 1, cars)

[[ 0  0  2  5  5  5]
 [ 0  0  2  8  0  0]
 [ 1  1  3  8  0  7]
 [ 4  4  3  6  6  7]
 [ 9 10  0  0  0  7]
 [ 9 10  0  0  0  0]]


In [16]:
## A* search
res = aStar(problem1, wight=1, display=True)
path = res.path()
print("path length:", len(path))
# print(path)

frontier count: 8875
expanded nodes: 2887
< goal state:
[[0 0 2 0 5 5]
 [0 0 2 0 0 0]
 [0 1 1 0 0 0]
 [0 6 6 3 4 0]
 [0 0 0 3 4 0]
 [0 0 0 3 0 0]] >
path length: 10


In [17]:
## wighted A* search
res = aStar(problem1, wight=1.1, display=True)
path = res.path()
print("path length:", len(path))
# print(path)

frontier count: 1736
expanded nodes: 684
< goal state:
[[0 0 2 0 5 5]
 [0 0 2 0 0 0]
 [0 1 1 0 0 0]
 [0 6 6 3 4 0]
 [0 0 0 3 4 0]
 [0 0 0 3 0 0]] >
path length: 10


In [18]:
## bfs
res = bfs(problem1)
print(res.state)
path = res.path()
print("path length:", len(path))
# print(path)

[[0 0 5 5 4 0]
 [0 0 0 0 4 0]
 [1 1 0 0 0 0]
 [0 0 2 3 6 6]
 [0 0 2 3 0 0]
 [0 0 0 3 0 0]]
path length: 10
