In [1]:
from PIL import Image, ImageDraw
import numpy as np
import matplotlib.pyplot as plt
from math import sqrt
from heapq import heappop, heappush
from random import randint
from datetime import datetime
import copy
%matplotlib inline

from src.grid import intersect_cells, Map, intersect_points
from src.theta import make_path
from src.theta_ap import *
from src.utils import *
from src.search_tree import *

In [2]:
class Drawer:
    def __init__(self, grid_map, start, goal):
        self.start = start
        self.goal = goal
        k = 50
        r = 0.25 * k
        rr = 0.05 * k
        self.grid_map = grid_map
        self.ims = []
        
        height, width = self.grid_map.get_size()
        h_im = height * k
        w_im = width * k
        self.static_im = Image.new('RGBA', (w_im, h_im), color = (255, 255, 255, 255))
        
        self.draw_static_im()
    
    def draw_static_im(self):
        k = 50
        r = 0.25 * k
        rr = 0.05 * k
        
        height, width = self.grid_map.get_size()
        draw = ImageDraw.Draw(self.static_im)
        
        # grid points
        for i in range(height + 1):
            for j in range(width + 1):
                draw.ellipse((j * k - rr, i * k - rr, j * k + rr, i * k + rr), fill=(50, 50, 50))

        # obstacles
        for i in range(height):
            for j in range(width):
                if(not self.grid_map.traversable(i, j)):
                    draw.rectangle((j * k, i * k, (j + 1) * k - 1, (i + 1) * k - 1), fill=( 70, 80, 80 ))
                    
        # start
        draw.ellipse((self.start.j * k - r, self.start.i * k - r, self.start.j * k + r, self.start.i * k + r), fill=(40, 180, 99))

        # goal
        draw.ellipse((self.goal.j * k - r, self.goal.i * k - r, self.goal.j * k + r, self.goal.i * k + r), fill=(231, 76, 60))
 
        
    def cute_draw(self, curr, parents=set(), show=False):
        k = 50
        r = 0.25 * k
        rr = 0.05 * k
        
        height, width = self.grid_map.get_size()
        h_im = height * k
        w_im = width * k
        im = copy.deepcopy(self.static_im)
        draw = ImageDraw.Draw(im)
        static_draw = ImageDraw.Draw(self.static_im)
            
        # curr
        draw.ellipse((curr.j * k - r, curr.i * k - r, curr.j * k + r, curr.i * k + r), fill=(52, 152, 219))
        draw.line(((curr.j * k, curr.i * k), (curr.parent.j * k, curr.parent.i * k)), fill=(219, 152, 52), width = 5)
        
        # static curr
        if curr in parents:
            static_draw.ellipse((curr.j * k - r, curr.i * k - r, curr.j * k + r, curr.i * k + r), fill=(52, 152, 219, 100))
            static_draw.line(((curr.j * k, curr.i * k), (curr.parent.j * k, curr.parent.i * k)), fill=(219, 152, 52, 100), width = 5)
        
        self.ims.append(im)
        if show:
            _, ax = plt.subplots(dpi=150)
            ax.axes.xaxis.set_visible(False)
            ax.axes.yaxis.set_visible(False)
            plt.imshow(np.asarray(im))
            plt.show()
        
    def save(self, path, duration=1):
        self.ims[0].save(path,  save_all=True, append_images=self.ims[1:], optimize=False,
                         duration=duration*len(self.ims), loop=0)
        

In [3]:
def simple_test(search_func, task, *args, **kwargs):
    '''
    simple_test runs search_func on one task (use a number from 0 to 25 to choose a certain debug task on simple map or None to choose a random task from this pool) with *args as optional arguments and displays:
     - 'Path found!' and some statistics -- path was found
     - 'Path not found!' -- path was not found
     - 'Execution error' -- an error occurred while executing the SearchFunction In first two cases function also draws visualisation of the task
    '''
    
    height = 15
    width = 30
    map_str = '''
.....................@@.......
.....................@@.......
.....................@@.......
...@@................@@.......
...@@........@@......@@.......
...@@........@@......@@@@@....
...@@........@@......@@@@@....
...@@........@@...............
...@@........@@...............
...@@........@@...............
...@@........@@...............
...@@........@@...............
.............@@...............
.............@@...............
.............@@...............
'''

    task_map = Map()
    task_map.read_from_string(map_str, width, height)
    starts = [(9, 0), (13, 0), (7, 28), (14, 29), (4, 1), (0, 17), (5, 6), (5, 20), (12, 2), (7, 28), (11, 9), (3, 2), (3, 17), (13, 20), (1, 1), (9, 10), (14, 6), (2, 0), (9, 28), (8, 6), (11, 6), (3, 0), (8, 9), (14, 7), (12, 4), (7, 1), (9, 2), (3, 5), (3, 5)]
    goals = [(11, 20), (2, 19), (6, 5), (4, 18), (9, 20), (7, 0), (2, 25), (12, 4), (3, 25), (0, 12), (4, 23), (2, 24), (9, 2), (1, 6), (13, 29), (14, 29), (2, 28), (14, 16), (13, 0), (1, 27), (14, 25), (10, 20), (12, 28), (2, 29), (1, 29), (4, 26), (3, 17), (9, 2), (3, 5)]

    if (task is None) or not (0 <= task < len(starts)):
        task = randint(0, len(starts) - 1)
        
    print("task number: ", task)

    start = NodeAP(*starts[task])
    goal = NodeAP(*goals[task])
    try:
        result = search_func(task_map, start.i, start.j, goal.i, goal.j, *args, **kwargs)
        number_of_steps = result[2].expansions
        nodes_created = result[2].max_tree_size
        if result[0]:
            path = make_path(result[1])
            print("Path found! Length: " + str(result[2].way_length) + \
                ". Memory_usage: " + str(nodes_created) + \
                ". Number of steps: " + str(number_of_steps))
            path_cells = [(node.i, node.j) for node in path[0]]
            print(path_cells)
        else:
            print("Path not found!")
        return result

    except Exception as e:
        print("Execution error")
        print(e)


In [4]:
def hard_test(search_func, task, *args, **kwargs):
    '''
    simple_test runs search_func on one task (use a number from 0 to 25 to choose a certain debug task on simple map or None to choose a random task from this pool) with *args as optional arguments and displays:
     - 'Path found!' and some statistics -- path was found
     - 'Path not found!' -- path was not found
     - 'Execution error' -- an error occurred while executing the SearchFunction In first two cases function also draws visualisation of the task
    '''
    
    height = 4
    width = 10
    map_str = '''
.....@..@.
.@...@..@.
.@...@..@.
.@........
'''

    task_map = Map()
    task_map.read_from_string(map_str, width, height)
    starts = [(1, 0)]
    goals = [(1, 6)]

    if (task is None) or not (0 <= task < len(starts)):
        task = randint(0, len(starts) - 1)
        
    print("task number: ", task)

    start = NodeAP(*starts[task])
    goal = NodeAP(*goals[task])
    try:
        result = search_func(task_map, start.i, start.j, goal.i, goal.j, *args, **kwargs)
        number_of_steps = result[2].expansions
        nodes_created = result[2].max_tree_size
        if result[0]:
            path = make_path(result[1])
            print("Path found! Length: " + str(result[2].way_length) + \
                ". Memory_usage: " + str(nodes_created) + \
                ". Number of steps: " + str(number_of_steps))
            path_cells = [(node.i, node.j) for node in path[0]]
            print(path_cells)
        else:
            print("Path not found!")
        return result

    except Exception as e:
        print("Execution error")
        print(e)


In [5]:
def draw_path(grid_map, start_i, start_j, goal_i, goal_j, goal, store_path):
    current = goal
    path = []
    drawer = Drawer(grid_map, Point(start_i, start_j), Point(goal_i, goal_j))
    parents = set()
    while current.prev != current:
        parents.add(current.parent)
        path.append(current)
        current = current.prev
    path.reverse()
    for node in path:
        drawer.cute_draw(node, parents, False)
    drawer.save(store_path, 5)

In [6]:
def theta_ap_draw(grid_map, start_i, start_j, goal_i, goal_j, heuristic_func = None, search_tree = None):
    start_time = datetime.now() #statistic
    
    stats = Stats() # statistic

    grid_map.add_special_point((goal_i, goal_j))
    
    ast = search_tree() 
    start = NodeAP(start_i, start_j, g=0, parent = None, prev = None)
    start.apply_heuristic(heuristic_func, goal_i, goal_j)
    ast.add_to_open(start)
    
#     drawer = Drawer(grid_map, Point(start_i, start_j), Point(goal_i, goal_j))
#     drawer.cute_draw(Point(start_i, start_j))
    
    while not ast.open_is_empty():
        curr = ast.get_best_node_from_open()
        if curr is None:
            break
        
        ast.add_to_closed(curr)

#         drawer.cute_draw(curr)
        
        # print("get", curr, "prev", curr.prev, "parent", curr.parent)
        
        if (curr.i == goal_i) and (curr.j == goal_j): # curr is goal
            stats.runtime = datetime.now() - start_time # statistic
            stats.way_length = make_path(curr)[1] # statistic
#             drawer.save("theta.gif")
            draw_path(grid_map, start_i, start_j, goal_i, goal_j, curr, "theta_1.gif")
            return  (True, curr, stats, ast.OPEN, ast.CLOSED)
        
        
        
        # expanding curr
        stats.expansions += 1 # statistic
        successors = getSuccessors(curr, grid_map, goal_i, goal_j, heuristic_func, start_i, start_j, ast)
        
        for node in successors:
            if not ast.was_expanded(node):
                ast.add_to_open(node)
                
        stats.max_tree_size = max(stats.max_tree_size, len(ast)) # statistic
        
    stats.runtime = datetime.now() - start_time # statistic
    stats.way_length = 0 # statistic
#     drawer.save("theta.gif")
    draw_path(grid_map, start_i, start_j, goal_i, goal_j, curr, "theta_1.gif")
    return (False, None, stats, ast.OPEN, ast.CLOSED)


In [7]:
%time res = simple_test(theta_ap_draw, 6, euclidian_distance, SearchTreePQS)

task number:  6
Path found! Length: 26.015339721864635. Memory_usage: 308. Number of steps: 199
[(5, 6), (4, 13), (4, 15), (7, 21), (7, 26), (4, 26), (2, 25)]
CPU times: user 584 ms, sys: 107 ms, total: 691 ms
Wall time: 690 ms
