# A* Algo

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
import math
import heapq

n = 5
p=0.3
block_unblock_matrix = []
agent_matrix = []
parentDict = {}

In [2]:
def euclidean_distance_calc(i, j, n):
    """
    This function returns Euclidean distance for every cell
    i: row value
    j: column value
    n: size of the maze
    """
    return math.sqrt(((n - i)**2 + (n - j)**2))


def chebyshev_distance_calc(i, j, n):
    """
    This function returns chebyshev distance for every cell
    i: row value
    j: column value
    n: size of the maze
    """
    return max((n - i), (n - j))


def manhattan_distance_calc(i, j, n):
    """
    This function returns manhattan distance for every cell
    i: row value
    j: column value
    n: size of the maze
    """
    return abs(n - i) + abs(n - j)


def compute_hofn(n, heuristic):
    """
    This function returns h(n) value for every cell as a matrix
    heuristic: specific type of heuristic used to calculate h(n)
    n: size of the maze
    """
    hofn_matrix = []
    for i in range(n):
        hofn_row = []
        for j in range(n):
            if heuristic == "euclidean":
                dist = euclidean_distance_calc(i, j, n -1)
            elif heuristic == "chebyshev":
                dist = chebyshev_distance_calc(i, j, n -1)
            elif heuristic == "manhattan":
                dist = manhattan_distance_calc(i, j, n -1)
            hofn_row.append(dist)
        hofn_matrix.append(hofn_row)
    return np.array(hofn_matrix)

In [3]:
def populateMatrix(n,p):
    """
    This function returns randomly populated maze
    p: probabillity density of each cell
    n: size of the maze
    """
    global block_unblock_matrix
    np_array = np.random.rand(n*n)
    block = lambda x: 0 if x>=p else 1
    vectorized_block = np.vectorize(block)
    np_array = vectorized_block(np_array).reshape(n,n)
    np_array[0,0]=0
    np_array[n-1,n-1] =0
    block_unblock_matrix = np.asmatrix(np_array)

#############testing on hard coded matrix ###########
# block_unblock_matrix = [[0, 0, 0, 0, 0],
#         [0, 0, 0, 0, 0],
#         [0, 0, 0, 0, 0],
#         [0, 1, 0, 0, 0],
#         [1, 0, 0, 1, 0]]
#######################################
# populateMatrix(n,p)
# block_unblock_matrix = np.matrix(block_unblock_matrix)
# plt.spy(block_unblock_matrix)

In [4]:
def initialize(dim,p):
    global agent_matrix
    global n
    global parentDict
    n = dim
    block_unblock_matrix = []
    np_array = np.zeros((n,n), dtype=int)
    agent_matrix = np.asmatrix(np_array)
    parentDict = {}
    populateMatrix(n,p)

## g(n) matrix

In [5]:
def compute_gofn(n):
    k = 0
    gofn = []
    one_row = []
    for j in range(n):
        one_row.append(k)
        k += 1
    gofn.append(one_row)
    for i in range(n-1):
        one_row = [x+1 for x in one_row]
        gofn.append(one_row)

    gofn_matrix = np.array([np.array(xi) for xi in gofn])
    return gofn_matrix

In [6]:
def get_gofn(gofn_matrix, row, col):
    return gofn_matrix[row][col]

In [7]:
def get_hofn(hofn_matrix, row, col):
    return hofn_matrix[row][col]

In [8]:
def get_children(matrix, x, y, n, gofn, visited_list):
    """
    This function is used to return children of a particular node
    x: row value of current node
    y: column value of current node
    n: size of maze
    gofn: g(n) value of current node
    visited_list: list of nodes that are already visited
    matrix: maze
    agent_matrix: matrix of the agent that holds the information of updated environment
    """
    allChildren = [(x+1,y,gofn+1),#down
                   (x-1,y,gofn+1),#up
                   (x,y+1,gofn+1),#right
                   (x,y-1,gofn+1)]#left
    if x+1 > n-1:
        allChildren.remove((x+1,y,gofn+1))
    if x-1 < 0:
        allChildren.remove((x-1,y,gofn+1))
    if y+1 > n-1:
        allChildren.remove((x,y+1,gofn+1))
    if y-1 < 0:
        allChildren.remove((x,y-1,gofn+1))
    allChildren = [node for node in allChildren if node[:2] not in visited_list]

    return allChildren

In [9]:
def display(trajectory_path):
    """
    This function is used to display maze with traversed path(highlighted)
    trajectory_path: path traversed by the agent so far
    """
    np_array = np.zeros((n,n), dtype=int)
#     np_array = populateMatrix(n,0)
    final_display_matrix = np.asmatrix(np_array)
    for path in trajectory_path:
        final_display_matrix[path[:2]] = 1
    colormap = colors.ListedColormap(["white","green"])
    plt.imshow(final_display_matrix, cmap=colormap)
    plt.show()

In [10]:
def display2(trajectory_path,matrix):
    """
    This function is used to display maze with traversed path(highlighted)
    trajectory_path: path traversed by the agent so far
    matrix: maze of the problem
    """
    temp = matrix.copy()
    for path in trajectory_path:
        temp[path[:2]] = 2
    colormap = colors.ListedColormap(["white","black","green"])
    plt.imshow(temp, cmap=colormap)
    plt.show()

In [11]:
def node_block_check(children_dict, block_unblock_matrix, current_node):
    """
    This function is used to remove the nodes which are blocked and discarded from display
    trajectory_path: path traversed by the agent so far
    children_dict: the dictionary holding information about parent and child relationship
    block_unblock_matrix: The maze with zeroes and ones
    current_node: The current node at which agent resides in
    """
    node_blocked = 0
    node_removal_list = []
#     for node in children_dict:
    children_node_list = children_dict[current_node].copy()
    for i in range(len(children_node_list)):
        #checking if all children are blocked
        if block_unblock_matrix.item(children_node_list[i]) == 1:
            node_blocked += 1
            children_dict[current_node].remove(children_node_list[i])
        if node_blocked == len(children_node_list):
            node_removal_list.append(current_node)
    return node_removal_list

In [12]:
def buildParentDict(children_dict):
    """
    This function is used to return 1:1 parent and children relationship for every node
    trajectory_path: path traversed by the agent so far
    children_dict: the dictionary holding information about parent and child relationship
    """
    global parentDict
    for key in children_dict.keys():
        for child in children_dict[key]:
            parentDict[child] = key

In [13]:
def getShortestPath(start):
    """
    This function is used to return shortest path once the goal node is discovered
    """
    path = [(n-1,n-1)]
    node = (n-1,n-1)
    start = start[:2]
    while node != start:
        path.append(parentDict[node])
        node = parentDict[node]
    return path

In [14]:
def get_fofn(open_list, hofn_matrix):
    """
    This function returns the sorted list of nodes based on their costs to the goal node
    open_list: Consists of list of nodes yet to be visited
    hofn_matrix: A computed distance of hofn for each node based on heuristic type chosen(manhattan, euclidian..)
    gofn_matrix: A computed distance from start to the current node
    """
    sort_dict = {}
    for node in open_list:
        row = node[0]
        col = node[1]
        gofn = node[2]
        hofn = hofn_matrix.item(row,col)
        fofn = gofn + hofn
        sort_dict[node] = fofn

    return sort_dict

In [15]:
def priority_queue(open_list, hofn_matrix):
    """
    This function returns the sorted list of nodes based on their costs to the goal node
    open_list: Consists of list of nodes yet to be visited
    hofn_matrix: A computed distance of hofn for each node based on heuristic type chosen(manhattan, euclidian..)
    """

    temp_open_list = []
    hofn_open_list = []
    new_open_list = []
    
    fofn_dict = get_fofn(open_list, hofn_matrix)
    
    # map fofn with every node before processing it with min heap
    for fofn in fofn_dict:
        for node in open_list:
            if node[:2] == fofn[:2]:
                node = list(node[:2])
                node.append(fofn_dict[fofn])
                hofn_open_list.append(tuple(node))

#     print("hofn_open_list", hofn_open_list)
    #obtaining the third parameter in the node tuple to get the node's fofn value to run it on heap
    for node in hofn_open_list:
        temp_open_list.append(node[2])
    
    #using heap library function to sort the queue based on least cost to goal node
    heapq.heapify(temp_open_list)

    for heuristic in temp_open_list:
        for node in hofn_open_list:
            if heuristic == node[2]:
                if node not in new_open_list:
                    new_open_list.append(node)
                continue
    return new_open_list

In [16]:
def astar(start, agent_matrix):
    """
    This function is used to discover the path from start to the goal
    agent_matrix: The matrix which store information about discovered maze
    """
    
    visited_list = []
    open_list = []
    trajectory_path = []
    trajectory_plus_gofn = []
    path_block = 0
    children_dict = {}
    i = 0
    goal = (n-1, n-1)
    num_of_cells_processed = 0

    open_list.append(start)
    gofn_matrix = compute_gofn(n)
    hofn_matrix = compute_hofn(n, "manhattan")

    while open_list:
        child_list = []
        current_node = open_list.pop(0)
        num_of_cells_processed += 1

        # adding current node to visited list of nodes if not added before
        if current_node[:2] not in visited_list:
            visited_list.append(current_node[:2])

        # adding current node to list of final path of nodes if not added before
            if current_node[:2] not in trajectory_path: 
                trajectory_path.append(current_node[:2])
                trajectory_plus_gofn.append(current_node)

            # Condition to have reached the goal node
            if agent_matrix.item(current_node[0], current_node[1]) == 0:
                if current_node[0] == n-1 and current_node[1] == n-1:
#                     print("Reached Goal!!!!!!!!!!!!!!!!!!!!!!!")
                    trajectory_plus_gofn.append(current_node)
                    buildParentDict(children_dict)
                    shortestPath = getShortestPath(start)
                    return (trajectory_path,trajectory_plus_gofn,num_of_cells_processed,shortestPath)

                # Compute the list of children nodes for the current node
                children = get_children(agent_matrix, current_node[0], current_node[1], n, current_node[2], visited_list)

                # Remove the third parameter (g(n)) from each of the child tuple
                for x in children:
                    child_list.append(x[:2])
                children_dict[current_node[:2]] = child_list

                # Get the next node of least weight/cost
                for node in children:
                    if node not in open_list:
                        open_list.append(node)
                open_list = priority_queue(open_list, hofn_matrix)

                # Remove the node if it is blocked to retract from the previous unblocked node
                node_removal_list = node_block_check(children_dict, agent_matrix, current_node[:2])
                for node in node_removal_list:
                    if node in trajectory_path:
                        trajectory_path.remove(node)
                        trajectory_plus_gofn = [i for i in trajectory_plus_gofn if i[:2] != node]

            else:
                if open_list == []:
#                     print("There is no path available to goal Node")
                    return ([],[], 0,[])
                else:
                    if current_node[:2] in trajectory_path:
                        trajectory_path.remove(current_node[:2])
                        trajectory_plus_gofn.remove(current_node)
        else:
            if open_list == []:
#                     print("There is no path available to goal Node++")
                    return ([],[], 0,[])
    return ([],[], 0,[])

In [17]:
def repeated_astar():
    """
    This function is used to call a_star repeatedly to get the shortest path from start to the goal node
    """
    global agent_matrix
    global block_unblock_matrix

    goal_reached = False
    goal = (n-1,n-1)
    path = []
    final_path = [(0,0,0)]
    trajectory_length = 0
    num_of_cells =0
    
    while not goal_reached:
        (path1,path2,astar_num_cells,shortest_path) = astar((final_path[-1][0],final_path[-1][1],0),agent_matrix)
        num_of_cells += astar_num_cells
        if(path1 == []):
            return (0,0,[])
        
        for node in path2:
            if block_unblock_matrix[node[:2]] == 0:
#                 print("node: ",node[:2]," is unblocked")
                if node not in final_path:
                    final_path.append(node)
                    trajectory_length += 1
                if node[:2] == goal:
                    goal_reached = True
                    break
            else:
                agent_matrix[node[:2]] = 1
                trajectory_length += 2
                break
    trajectory_path ,_,_,shortest_path_final_discovered_gridworld= astar((0,0,0),agent_matrix)
    return (trajectory_length, num_of_cells, shortest_path_final_discovered_gridworld)

In [18]:
######## Repeated A* #####################
# initialize(n,p)
# trajectory_length, num_of_cells, shortest_path_final_discovered_gridworld = repeated_astar()
# print("Trajectory length: ",trajectory_length,"\n num of cells processed: ",num_of_cells,"shortest path:")
# display2(shortest_path_final_discovered_gridworld,agent_matrix)
# plt.spy(agent_matrix)
########## A* on full gridworld ###########
# initialize(n,p)
# print(block_unblock_matrix)
# print("A*: ")
# (trajectory_path,_,_,shortest_path) = astar((0,0,0),block_unblock_matrix)
# print("trajectory A*: ",shortest_path)
# display(trajectory_path)
# print("shortest path A*: ",shortest_path)
# display(shortest_path)
# print("Actual Gridworld")
# plt.spy(block_unblock_matrix)
#############################################
# parentDict = {}
############ A* on final discovered Gridworld ##########
# print("A* on final discovered gridworld:")
# trajectory_path ,_,_,shortest_path= astar((0,0,0),agent_matrix)
# print("trajectory_path for A* on final discovered world: ",trajectory_path)
# display2(trajectory_path,agent_matrix)
# print("shortest path for A* on final discovered world:")
# display2(shortest_path,agent_matrix)
# print("shortest path: ",shortest_path)

In [37]:
# plt.spy(block_unblock_matrix)