<h1><center>Search Methods in Artificial Intelligence Course Project</center></h1>
<h2><center>Comparison Between A*, RBFS and an improved RBFS</center></h2>
<h3><center>Koren Levenbrown, Shvat Messica</center></h3>

In [1]:
from queue import PriorityQueue as queue
from time import time
import numpy as np
from sys import maxsize
from abc import abstractmethod
import matplotlib.pyplot as plt
import seaborn as sns
import multiprocessing
import random
from tqdm import tqdm
from collections import Counter
random.seed(0)
np.random.seed(0)

<h1><center>A* algorithm definition</center></h1>

In [2]:
def run_Astar(initial_state, results, *rest):
    results['results'] = False
    priority_potential_states = queue()
    explored = {}
    expanded_nodes_counter = 0
    priority_potential_states.put((initial_state.evaluation_function, expanded_nodes_counter, initial_state))
    
    while not priority_potential_states.empty():
        current_state = priority_potential_states.get()
        current_state = current_state[2]
        explored[str(current_state.state.tolist())] = 1
        if current_state.goal_test():
            results['results'] = current_state.find_solution(), expanded_nodes_counter
            return

        next_potential_states = current_state.generate_next_potential_states()
        for potential_state in next_potential_states:
            try:
                explored[potential_state.state.tolist()]
            except:
                expanded_nodes_counter += 1
                priority_potential_states.put((potential_state.evaluation_function, expanded_nodes_counter, potential_state))
    return

<h1><center>RBFS algorithms definition</center></h1>

In [3]:
def run_RBFS(initial_state, results, epsilon = 0, mean_v = False):
    results['results'] = False
    func = RBFS_search
    if mean_v:
        func = RBFS_search_mean
    
    res = func(initial_state, maxsize, 0, epsilon)
    node=res[0]
    num_expands = res[2]
    if node != None:
        results['results'] =  node.find_solution(), num_expands

def RBFS_search(state,f_limit, expanded_nodes_counter, epsilon):
    successors=[]

    if state.goal_test():
        return state,None, expanded_nodes_counter
    
    children=state.generate_next_potential_states()
    if not len(children):
        return None, maxsize, expanded_nodes_counter
    
    count=-1
    for child in children:
        count+=1
        expanded_nodes_counter += 1
        successors.append((child.evaluation_function,count,child)) #expanded_nodes_counter
    
    while len(successors):
        successors.sort()
        best_state=successors[0][2]
        if best_state.evaluation_function > f_limit + epsilon:
            return None, best_state.evaluation_function, expanded_nodes_counter
        
        if len(successors) == 1:
            alternative = maxsize
        else:
            alternative=successors[1][0]
        
        result,best_state.evaluation_function, expanded_nodes_counter=RBFS_search(best_state,min(f_limit,alternative), expanded_nodes_counter, epsilon)
        successors[0]=(best_state.evaluation_function,successors[0][1],best_state)
        if result!=None:
            break
    return result, None, expanded_nodes_counter


def RBFS_search_mean(state,f_limit, expanded_nodes_counter, epsilon):
    successors=[]

    if state.goal_test():
        return state,None, expanded_nodes_counter
    
    children=state.generate_next_potential_states()
    if not len(children):
        return None, maxsize, expanded_nodes_counter
    
    count=-1
    for child in children:
        count+=1
        expanded_nodes_counter += 1
        successors.append((child.evaluation_function,count,child)) #expanded_nodes_counter
    
    while len(successors):
        successors.sort()
        best_state=successors[0][2]
        if best_state.evaluation_function > f_limit + epsilon:
            return None, best_state.evaluation_function, expanded_nodes_counter
        
        if len(successors) == 1:
            alternative = maxsize
        elif len(successors) > 2: # there are at least 3 childs
                alternative=np.mean([successors[1][0], successors[2][0]])
        else:
            alternative=successors[1][0]
        
        result,best_state.evaluation_function, expanded_nodes_counter=RBFS_search(best_state,min(f_limit,alternative), expanded_nodes_counter, epsilon)
        successors[0]=(best_state.evaluation_function,successors[0][1],best_state)
        if result!=None:
            break
    return result,None, expanded_nodes_counter

In [4]:
class Game():
    goal_state = None
    
    def __init__(self, state, parent, action, current_cost):
        self.parent = parent
        self.state = state
        self.action = action
        self.path_cost = current_cost

    @abstractmethod
    def goal_test(self):
        pass

    @abstractmethod
    def find_legal_actions(self, row, column):
        pass

    @abstractmethod
    def generate_next_potential_states(self):
        pass

    def find_solution(self):
        actions = []
        actions.append(self.action)
        previous_state = self
        while previous_state.parent != None:
            previous_state = previous_state.parent
            actions.append(previous_state.action)
        actions = actions[:-1]
        actions.reverse()
        return actions

<h1><center>Puzzle game defintion</center></h1>

In [5]:
class Puzzle(Game):    
    def __init__(self, state, parent, action, current_cost, n):
        super().__init__(state, parent, action, current_cost)
        self.evaluation_function = self.cost_estimation() + self.path_cost
        self.n = n
        self.goal_state = list(range(self.n ** 2))

    def goal_test(self):
        return self.state.tolist() == self.goal_state
    
    def cost_estimation(self):
        return sum(self.goal_state != self.state)

    def find_legal_actions(self, row, column):
        legal_action = ['UP', 'DOWN', 'LEFT', 'RIGHT']
        if row == 0:
            legal_action.remove('UP')
        elif row == self.n - 1:
            legal_action.remove('DOWN')
        if column == 0:
            legal_action.remove('LEFT')
        elif column == self.n - 1:
            legal_action.remove('RIGHT')
        return legal_action

    def generate_next_potential_states(self):
        next_potential_states = []
        empty_index = self.state.tolist().index(0)
        legal_actions = self.find_legal_actions(int(empty_index / self.n), int(empty_index % self.n))

        for action in legal_actions:
            new_state = self.state.copy()
            if action is 'UP':
                new_state[empty_index], new_state[empty_index - self.n] = new_state[empty_index - self.n], new_state[empty_index]
            elif action is 'DOWN':
                new_state[empty_index], new_state[empty_index + self.n] = new_state[empty_index + self.n], new_state[empty_index]
            elif action is 'LEFT':
                new_state[empty_index], new_state[empty_index - 1] = new_state[empty_index - 1], new_state[empty_index]
            elif action is 'RIGHT':
                new_state[empty_index], new_state[empty_index + 1] = new_state[empty_index + 1], new_state[empty_index]
            next_potential_states.append(Puzzle(new_state, self, action, self.path_cost + 1, self.n))
        return next_potential_states
    
def create_random_puzzle(board_size, number_boards):
    boards = np.repeat(np.expand_dims(np.arange(board_size ** 2), 0), number_boards, axis=0)
    idx = np.random.rand(*boards.shape).argsort(axis = 1)
    boards = np.take_along_axis(boards, idx, axis = 1)
    return boards

In [6]:
def find_legal_actions(row, column, n):
    legal_action = [0, 1, 2, 3]
    if row == 0:
        legal_action.remove(0)
    elif row == n - 1:
        legal_action.remove(1)
    if column == 0:
        legal_action.remove(2)
    elif column == n - 1:
        legal_action.remove(3)
    return legal_action

def blend(state, steps):
    n = int(len(state) ** 0.5)
    for i in range(steps):
        empty_index = state.tolist().index(0)
        action = random.choice(find_legal_actions(int(empty_index / n), int(empty_index % n), n))
        if action is 0:
            state[empty_index], state[empty_index - n] = state[empty_index - n], state[empty_index]
        elif action is 1:
            state[empty_index], state[empty_index + n] = state[empty_index + n], state[empty_index]
        elif action is 2:
            state[empty_index], state[empty_index - 1] = state[empty_index - 1], state[empty_index]
        elif action is 3:
            state[empty_index], state[empty_index + 1] = state[empty_index + 1], state[empty_index]
    return state

def create_random_puzzle(board_size, number_boards):
    boards = []
    for i in range(number_boards):
        board = blend(np.arange(board_size ** 2), random.randint(5, 50))
        boards.append(board)
    return np.array(boards)

<h1><center>helper functions</center></h1>

In [7]:
def wrap_timeout(target_f, args, time_sec):
    p = multiprocessing.Process(target = target_f, args = args)
    p.start()
    p.join(time_sec)

    if p.is_alive():
        p.terminate()
        p.join()
        
def calculate_statistics(total_runtimes, total_expanded_nodes_count):
    total_runtimes = total_runtimes * 1000
    results = []
    results.append(total_runtimes.mean(axis=0).tolist())
    results.append(total_runtimes.std(axis=0).tolist())
    results.append(total_expanded_nodes_count.mean(axis=0).tolist())
    results.append(total_expanded_nodes_count.std(axis=0).tolist())
    
    win_rate = [0] * len(total_runtimes[1])
    for x in total_runtimes.argmin(axis=1):
        win_rate[x] += 1
    results.append((np.array(win_rate) / len(total_runtimes)).tolist())
    return results

<h1><center>Puzzle experiments</center></h1>

In [8]:
def run_puzzle_experiments(algorithms, board_size, number_boards):
    states = create_random_puzzle(board_size, number_boards)
    total_runtimes = []
    total_expanded_nodes_count = []
    for i in tqdm(range(len(states))):
        initial_state = Puzzle(states[i], None, None, 0, board_size)
        runtimes = []
        t = True
        expanded_nodes_count = []
        for algorithm in algorithms:
            manager = multiprocessing.Manager()
            results = manager.dict()
            t0 = time()
            wrap_timeout(algorithm['func'], [initial_state, results, *algorithm['args_v']], 5)
            results = results.values()[0]
            runtime = time() - t0
            if results:
                runtimes.append(runtime)
                expanded_nodes_count.append(results[1])
            else:
                t = False
                break
        if t:
            total_runtimes.append(np.array(runtimes))
            total_expanded_nodes_count.append(np.array(expanded_nodes_count))
    return calculate_statistics(np.array(total_runtimes), np.array(total_expanded_nodes_count))

board_size = 4
number_boards = 1
algorithm_names = ['A*', 'RBFS', 'RBFSe=5', 'RBFSmean']
algorithms = [{'func': run_Astar, 'args_v': [None]}, 
                        {'func': run_RBFS, 'args_v': [0, False]}, 
                        {'func': run_RBFS, 'args_v': [5, False]}, 
                        {'func': run_RBFS, 'args_v': [0, True]}]

results = run_puzzle_experiments(algorithms, board_size = board_size, number_boards = number_boards)

print('Puzzle experiment:')
print('Estimation function: No. of Misplaced Tiles')
print(f'Board size: {board_size}')
print(f'Number of experiments: {number_boards}')
for i in range(len(algorithm_names)):
    print(f'{algorithm_names[i]} stats:\t {results[0][i]} ± {results[1][i]} ms, {results[2][i]} ± {results[3][i]}, win rate {results[4][i]}')

    results = run_puzzle_experiments(algorithms, board_size = board_size, number_boards = number_boards)

print('Puzzle experiment:')
print('Estimation function: No. of Misplaced Tiles')
print(f'Board size: {board_size}')
print(f'Number of experiments: {number_boards}')
for i in range(len(algorithm_names)):
    print(f'{algorithm_names[i]} stats:\t {results[0][i]} ± {results[1][i]} ms, {results[2][i]} ± {results[3][i]}, win rate {results[4][i]}')

<h1><center>Maze game defintion</center></h1>

In [9]:
class Maze(Game):
    def __init__(self, state, parent, action, current_cost, current_index, goal_index, cost_estimation_func):
        super().__init__(state, parent, action, current_cost)
        self.current_index = current_index
        self.goal_index = goal_index
        self.size = state.shape
        self.est_cost_function = cost_estimation_func
        if self.est_cost_function == 'L1':
            self.evaluation_function = self.manhattan_cost_estimation() + self.path_cost
        else:
            self.evaluation_function = self.euclidan_cost_estimation() + self.path_cost
        
    def goal_test(self):
        return self.current_index == self.goal_index
        
    def manhattan_cost_estimation(self):
        return np.sum(np.abs(np.array(self.goal_index) - np.array(self.current_index)))
    
    def euclidan_cost_estimation(self):
        return np.linalg.norm(np.array(self.goal_index) - np.array(self.current_index))

    def find_legal_actions(self, row, column):
        legal_action = ['UP', 'DOWN', 'LEFT', 'RIGHT']
        if row == 0 or self.state[row - 1, column] == -1:
            legal_action.remove('UP')
        if row == self.size[0] - 1 or self.state[row + 1, column] == -1:
            legal_action.remove('DOWN')
        if column == 0 or self.state[row, column - 1] == -1:
            legal_action.remove('LEFT')
        if column == self.size[1] - 1 or self.state[row, column + 1] == -1:
            legal_action.remove('RIGHT')
        return legal_action

    def generate_next_potential_states(self):
        next_potential_states = []
        legal_actions = self.find_legal_actions(*self.current_index)

        for action in legal_actions:
            new_state = self.state.copy()
            new_index = self.current_index
            if action is 'UP':
                new_index = (new_index[0] - 1, new_index[1])
            elif action is 'DOWN':
                new_index = (new_index[0] + 1, new_index[1])
            elif action is 'LEFT':
                new_index = (new_index[0], new_index[1] - 1)
            elif action is 'RIGHT':
                new_index = (new_index[0], new_index[1] + 1)
            new_state[new_index] = 1
            next_potential_states.append(Maze(new_state, self, action, self.path_cost + 1, new_index, self.goal_index, self.est_cost_function))
        return next_potential_states
    
def draw_solution(board, solution=None):
    current_index = (0, 0)
    board[current_index] = 2
    if solution:
        for action in solution:
            if action == 'UP':
                current_index = (current_index[0] - 1, current_index[1])
            elif action == 'DOWN':
                current_index = (current_index[0] + 1, current_index[1])
            elif action == 'LEFT':
                current_index = (current_index[0], current_index[1] - 1)
            elif action == 'RIGHT':
                current_index = (current_index[0], current_index[1] + 1)
            board[current_index] = 2
    plt.imshow(board)
    plt.show()
    
def create_random_maze(maze_width, maze_one_percentage, number_boards):
    boards = []
    for k in range(number_boards):
        maze = np.zeros((maze_width, maze_width))
        maze_size = maze_width * maze_width
        chosen_indices = np.random.choice(maze_size, int(maze_one_percentage * maze_size))
    
        for i in range(maze_width):
            for j in range(maze_width):
                curr_idx = i * maze_width + j
                if curr_idx in chosen_indices and (i != 0 or j != 0) and (i != maze_width - 1 or j != maze_width - 1):
                    maze[i][j] = -1  
        boards.append(maze)
    return boards

<h1><center>Maze experiments</center></h1>

In [10]:
def run_maze_experiments(algorithms, cost_estimation_func, board_size, number_boards):
    states = create_random_maze(board_size, .3, number_boards)
    total_runtimes = []
    total_expanded_nodes_count = []
    for i in tqdm(range(len(states))):
        initial_state = Maze(states[i], None, None, 0, (0, 0), (board_size - 1, board_size - 1), cost_estimation_func)
        t = True
        runtimes = []
        expanded_nodes_count = []
        for algorithm in algorithms:
            manager = multiprocessing.Manager()
            results = manager.dict()
            t0 = time()
            wrap_timeout(algorithm['func'], [initial_state, results, *algorithm['args_v']], 5)
            results = results.values()[0]
            runtime = time() - t0
            if results:
                runtimes.append(runtime)
                expanded_nodes_count.append(results[1])
            else:
                t = False
                break
        if t:
            total_runtimes.append(np.array(runtimes))
            total_expanded_nodes_count.append(np.array(expanded_nodes_count))
    return calculate_statistics(np.array(total_runtimes), np.array(total_expanded_nodes_count))

board_size = 7
number_boards = 1000
est_func = 'L1'
algorithm_names = ['A*', 'RBFS', 'RBFSe=5', 'RBFSmean']
algorithms = [{'func': run_Astar, 'args_v': [None]}, 
                        {'func': run_RBFS, 'args_v': [0, False]}, 
                        {'func': run_RBFS, 'args_v': [5, False]}, 
                        {'func': run_RBFS, 'args_v': [0, True]}]

results = run_maze_experiments(algorithms, est_func, board_size = board_size, number_boards = number_boards)

print('Maze experiment:')
print(f'Estimation function: {est_func}')
print(f'Board size: {board_size}')
print(f'Number of experiments: {number_boards}')
for i in range(len(algorithm_names)):
    print(f'{algorithm_names[i]} stats:\t {results[0][i]} ± {results[1][i]} ms, {results[2][i]} ± {results[3][i]}, win rate {results[4][i]}')

board_size = 9
results = run_maze_experiments(algorithms, est_func, board_size = board_size, number_boards = number_boards)

print('Maze experiment:')
print(f'Estimation function: {est_func}')
print(f'Board size: {board_size}')
print(f'Number of experiments: {number_boards}')
for i in range(len(algorithm_names)):
    print(f'{algorithm_names[i]} stats:\t {results[0][i]} ± {results[1][i]} ms, {results[2][i]} ± {results[3][i]}, win rate {results[4][i]}')

board_size = 11
results = run_maze_experiments(algorithms, est_func, board_size = board_size, number_boards = number_boards)

print('Maze experiment:')
print(f'Estimation function: {est_func}')
print(f'Board size: {board_size}')
print(f'Number of experiments: {number_boards}')
for i in range(len(algorithm_names)):
    print(f'{algorithm_names[i]} stats:\t {results[0][i]} ± {results[1][i]} ms, {results[2][i]} ± {results[3][i]}, win rate {results[4][i]}')