In [1]:
import numpy as np
import pandas as pd

## Preparation of Data

In [None]:
def generateRandomGrid(p):
    '''
        Function to fill a 101x101 grid with density of blocked cells = p
    '''
    matrix_string = "."
    for i in range(1,10200):
        if(random.uniform(0, 1)<=p):
            matrix_string+='X'
        else:
            matrix_string+='.'
    matrix_string+='.'
    return matrix_string

In [None]:
def generateData(p):
    '''
        Function generates Random Grids of size 101 x 101. For these grids, (0,0) is the start node and (100,100)
        is the target node.
        
        These function generates grids that are solvable. Total solvable grids are 1000.
    '''
    grids = []
    count = 0
    for i in range(0,2500):
        if count>=1000:
            break
        grid1 = generateRandomGrid(p)
        i = 0
        grid2 = []
        while(i<101*101):
            grid2.append(list(grid1[i:i+101]))
            i = i + 101
        grid2 = np.array(grid2)
        gw = GridWorld(grid2,(0,0),(100,100),101,101,agent_grid=grid2)
        path=gw.a_star((0,0),'m')
        if len(path)>0:
            grids.append(grid1)
            count+=1
    with open('p'+str(p)+'.txt', 'w') as f:
        print(str(p)+": ",len(grids))
        for grid in grids:
            f.write(grid)
            f.write('\n')

# A - Star Algorithm

Data Structures used:
1. SortedSet - 

    a. push: O(logn) 
    
    b. update: O(logn)
    
    c. pop: O(logn)

In [None]:
from sortedcontainers import SortedSet
from math import sqrt

class MyPriorityQueue(object):
    def __init__(self, current_heuristic, target):
        '''
            'current_heuristic': Contains the heuristics based on which the priorities are calculated. The value should have 'm' for Manhattan 
            distance, 'e' for Euclidean distance, 'c' for Chebyshev distance.
            
            'g' denotes the distance from the source to the current node.
            
            'h' denotes the heuristic that estimates the distance from current node to the target.
            
            '_data' is the SortedSet() that stores the nodes in the sorted order of priority.
        '''
        self.current_heuristic = current_heuristic
        self._data = SortedSet()
        self.target = target
        self.g = dict()
        self.h = dict()
    def __len__(self):
        ''' 
            returns the number of cells in the SortedSet 
        '''
        return len(self._data)
    
    def push(self, item):
        '''
            It creates a tuple whose first value contains the [total_priority(n), h(n), (x,y)].
            The first value contains the sum of g(n) and h(n) for cell n.
            The second value contains the value h(n)
            The third value contains the coordinate values of the cell n.
            
            To push, the cell in the fringe, the algorithm will first compare the overall cost = g(n)+h(n). If 
            for two cells, overall cost is same, then it will compare the estimated distance to target and return 
            the cell with lesser value.
            
            It is required to update g(n) and h(n) before pushing the node into the fringe to keep the rdering consistent
            with th cell values in g(n) and h(n).
        '''
        node = (self.calculate_priority(item), self.get_heuristic(item), item)
        self._data.add(node)
        
    def pop(self):
        '''
            Returns the cell with lowest priority.
        '''
        node = self._data.pop(0)[2]
        return node
    
    def manhattan_distance(self, x, y):
        '''
            Returns the Manhattan Distance between cell x and y.
            For cells (x1,y1) and (x2,y2) Mahattan Distance = abs(x1-x2) + abs(y1-y2) 
        '''
        return abs(x[0]-y[0])+abs(x[1]-y[1])
    
    def euclidean_distance(self, x, y): # Euclidean Distance -> heuristic = sqrt( (x1-x2)**2 + (y1-y2)**2 )
        '''
            Returns the Euclidean Distance between cell x and y.
            For cells (x1,y1) and (x2,y2) Euclidean Distance = sqrt( (x1-x2)**2 + (y1-y2)**2 )
        '''
        return sqrt( (x[0]-x[1])**2 + (y[0]-y[1])**2 )
    def chebyshev_distance(self, x, y): # Chebyshev Distance -> heuristic = max(|x1-x2|, |y1-y2|)
        '''
            Returns the Chebyshev Distance between cell x and y.
            For cells (x1,y1) and (x2,y2) Chebyshev Distance = max(abs(x1-x2), abs(y1-y2))
        '''
        return max(abs(x[0]-x[1]), abs(y[0]-y[1]))
    def get_heuristic(self, x):  # returns the value of heuristic based on the heuristic function
        '''
            Returns and stores the value of the heuristic based on the heuristic function 
            set in the self.current_heuristic. 
        '''
        measure = self.current_heuristic
        if measure == 'm': # if h is manhattan
            self.h[x] = self.manhattan_distance(x, self.target)
        elif measure == 'e': # if h is euclidean
            self.h[x] = self.euclidean_distance(x, self.target)
        elif measure == 'c': # if c is chebyshev
            self.h[x] = self.chebyshev_distance(x, self.target)
        return self.h[x]
    def calculate_priority(self, x): # calculates priority for each cell - distance from cell to target
        '''
            Returns the total_priority of the cell.
            Total Priority(n) = g(n) + h(n) { Standard heuristic function of A-Star algorithm }
        '''
        return self.g[x] + self.get_heuristic(x)

class GridWorld:
    def __init__(self, grid, start, target, n, m, agent_grid = None ):
        '''
            Instantiates the object that stores the information for the grid.
            n: denotes the number of rows
            m: denotes the number of columns
            start: denotes the start cell of the grid
            target: denotes the target cell of the grid
            trajectory: denotes the trajectory that the agent follows in the grid
            grid: 2d array that stores the full knowledge of the grid. '.' denotes an unblocked cell and 'X' denotes
                  a blocked cell.
            agent_grid: 2d array that stores the current knwledge of the agent. It gets updated after evey single run 
                        of the A-Star
            avg_num_cells_processed: Denotes the number of cells that are processed by the fringe.
        '''
        self.n = n
        self.m = m
        self.start = start
        self.target = target
        self.trajectory = []
        self.grid = grid
        if agent_grid is None:
            self.agent_grid = np.full((n,n),'.')
        else:
            self.agent_grid = agent_grid
        self.avg_num_cells_processed = 0
    def a_star(self, source, current_heuristic = 'm'):
        '''
            dirx, diry: used to get the nrighbouring nodes of the current node.
            visited: used to mark the nodes as visited.
            closed_list: used to mark the popped nodes. 
            open_list: used to store the nodes that are visited and it stores in the orde of non-decreasing tuple 
                        (total_priority, heuristic_value, (x,y))
            planned_path: List used to store cells that the agent plans to travel after applying A-star on the agent_grid
                          (grid that stores current knowledge of the grid)
            parent: dictionary that stores parent-child relations between cells. In (key, value) pairs, key is the child 
                    cell and value is the parent cell.
            
        '''
        dirx = [-1, 1, 0, 0] 
        diry = [0, 0, 1, -1] 
        visited = set() 
        closed_list = set() 
        open_list = MyPriorityQueue(current_heuristic, self.target)
        planned_path = []
        open_list.g[source]=0 # distance of source from the source is 0.
        open_list.calculate_priority( source) # calculating the heuristic value of the path from source to target 
        open_list.push( source) # Adding source to the fringe
        visited.add(source) # Marking the source as visited
        parent = {} 
        parent[source] = None # Parent of source is None
        while(len(open_list)>0):
            curr = open_list.pop() # Cell with minimum priority is popped.
            self.avg_num_cells_processed = self.avg_num_cells_processed + 1 # avg_num_cells_processed is incremented.
            closed_list.add(curr) # The popped cell is added to the priority queue 
            if(curr[0] == self.target[0] and curr[1] == self.target[1]): ## If the popped cell is target, 
                break                                                    ## the agent has planned the pah to the target
            for i in np.arange(4): ## Looping over the four neighbours of the current cell
                childx = curr[0] + dirx[i]
                childy = curr[1] + diry[i]
                child = (childx, childy)
                if(childx>=0 and childx<m and childy>=0 and childy<n and (child not in closed_list) and self.agent_grid[childx][childy]!='X'):
                    ## The above condition checks that if both the child coordinates are between [0,n-1],
                    ## it is not in closed_list, it is unblocked cell, then it will analyse the child node.
                    if(child not in visited):
                        ## If the child is not visited:
                        ## 1. Mark the child visited
                        ## 2. Mark the parent of the child
                        ## 3. Update the distance from the start node to the current node
                        ## 4. Calculate the priority of the children
                        ## 5. Push the child in the fringe
                        visited.add(child)
                        parent[child] = curr
                        open_list.g[child] = open_list.g[curr]+1
                        open_list.calculate_priority(child)
                        open_list.push(child)
                    else:
                        ## If the child is visited and it's new g val is lower than the stored g_val:
                        ## 1. Update the parent of the curr node
                        ## 2. Delete the node from the priority queue
                        ## 3. Update the measurement of cost and heuristic in priority queue
                        ## 4. Push the child in the priority queue
                        if open_list.g[curr]+1<open_list.g[child]:
                            parent[child] = curr
                            open_list._data.discard(child)
                            open_list.g[child] = open_list.g[curr]+1
                            open_list.calculate_priority(child)
                            open_list.push(child)
        ## If the fringe is empty and target node is not visited, then the agent was not able to plan a path to the target,
        ## hence planned path will be empty.
        if(self.target not in visited):
            return []
        
        ## If the agent can plan the path from source to the target based on the current knowledge, then the algorithm
        ## travels back from target to the source using parent pointers. Also, it stores the cells that it traverses
        ## in-between.
        curr = self.target
        while(curr != source):
            planned_path.append(curr)
            curr = parent[curr]
        planned_path.append(source)
        return planned_path[::-1]
    
    
    def check_planned_path(self, planned_path):
        '''
             It explores the path that agent plans using the A-star algorithm. The agent will strop traversing 
             the path when it sees a blocked cell in the path. This function returns the path when that the agent will
             be able to travel in the planned path.
        '''
        dirx = [-1, 1, 0, 0]
        diry = [0, 0, -1, 1]
        n = len(planned_path)
        traversed_path = [] # It stores the traversed path.
        ## For all the cells in the path, it updates whether the cell is blocked or not. It updates until the agent does
        ## not face blocked cell in the path.
        for i in np.arange(n):
            currx = planned_path[i][0]
            curry = planned_path[i][1]
            if(grid[currx][curry] == 'X'):
                break
            traversed_path.append((currx,curry))
            for i in np.arange(4):
                ## For every neighbour of the current node,
                ## it updates the nature of the neighbouring cell (blocked or unblocked)
                childx = currx + dirx[i]
                childy = curry + diry[i]
                if(childx>=0 and childx<self.m and childy>=0 and childy<self.n):
                    self.agent_grid[childx][childy] = self.grid[childx][childy]
        return traversed_path
    
        def check_planned_path2(self, planned_path):
        '''
            Repeated A-star: Question-8
            
            Function that optimises A-star. In Standard repeated A-star, the starting index of the next cycle of A-star
            is the last unblocked node in the traversed path.
            
            In this function, when traversing the planned_path, if the agent enters a tunnel, then in the next cycle of 
            A-star, it repeats A-star from the cell where the tunnel starts.
        '''
        dirx = [-1, 1, 0, 0]
        diry = [0, 0, -1, 1]
        n = len(planned_path)
        traversed_path = []
        start_tunnel_node = () # The node that represents the start of the tunnel
        for i in np.arange(n):
            ## For each cell in the planned_path, we calculate the number of neighbours.
            ## For the cells inside the tunnel, we will have the nodes with atleast 2 blocked neighbouring cells
            ## So, in the planned path, the last node with atleast 3 or more unblocked nodes will be the cell from
            ## which the A-star will start.
            currx = planned_path[i][0]
            curry = planned_path[i][1]
            if(self.grid[currx][curry] == 'X'):
                break
            traversed_path.append((currx,curry))
            num_neighbours = 0
            for j in np.arange(4):
                childx = currx + dirx[j]
                childy = curry + diry[j]
                if(childx>=0 and childx<self.m and childy>=0 and childy<self.n):
                    self.agent_grid[childx][childy] = self.grid[childx][childy]
                    if self.agent_grid[childx][childy] == '.':
                        num_neighbours = num_neighbours + 1
            if(num_neighbours >= 3):
                start_tunnel_node = planned_path[i]
        n = len(traversed_path)
        if(traversed_path[n-1] == self.target):
            return traversed_path, (), ()
        return traversed_path, start_tunnel_node
    
    def compute_path2(self):
        '''
            Question 8 - To compute Repeated A-star with optimisation where the source of the next cycle of
            A-star is optimised.
        '''
        path = []
        curr = self.start
        while(curr != self.target):
            planned_path = self.a_star(curr)
            if( len(planned_path) == 0 ):
                return []
            ## check_planned_path returns start_tunnel_node
            traversed_path, start_tunnel_node = self.check_planned_path2(planned_path)
            n = len(traversed_path)
            path.append(traversed_path)
            if(traversed_path[n-1] == self.target):
                break
            ## The next cycle of A-star runs on the start_tunnel_node
            curr = start_tunnel_node
        return path

    def compute_path(self):
        '''
            Returns the path that the agent will travel when it travls through the unknown grid using Repeated A-Star
            algorithm.
        '''
        path = [] ## Stores the final path that agent will travel
        curr = self.start
        while(curr != self.target):
            planned_path = self.a_star(curr) ## Returns the planned_path that the agent will traverse
            if( len(planned_path) == 0 ):
                return []
            traversed_path = self.check_planned_path(planned_path) ## Explores planned_path and returns traversed_path that 
            n = len(traversed_path)                                ## the node will actually travel
            path.append(traversed_path)                     
            if(traversed_path[n-1] == self.target):
                break
            curr = traversed_path[n-1]                             ## Start the A-star algorithm from the last unblocked 
                                                                   ## node explored in planned_path
        return path

# Repeated BFS

In [None]:
from sortedcontainers import SortedSet
from math import sqrt
from collections import deque
import numpy as np
class GridWorld2:
    '''
        This class implements Repeated BFS. In the repeated cycles of BFS, the knowledge of the agent_grid gets updated
        and the agent explores the grid by travelling through the planned path. If the agent reaches the target, it's done. 
        But if the agent encounters a block, the BFS gets called on the grid again, where source for the next BFS
        is the last unblocked cell in the planned path.
    '''
    def __init__(self, grid, start, target, n, m, agent_grid = None ):
        '''
            n,m represents the dimensions of the grid and agents grid
            trajectory refers to the overall path travelled by agent on the grid
            grid refers to the complete knowledge of the grid
            agent_gid refers to the current knowledge of the grid
            sum_num_cells_processed refers to the total number of cells processed by bfs
        '''
        self.n = n 
        self.m = m 
        self.start = start 
        self.target = target
        self.trajectory = []
        self.grid = grid 
        if agent_grid is None:
            self.agent_grid = np.full((n,n),'.')
        else:
            self.agent_grid = agent_grid 
        self.sum_num_cells_processed = 0
        
    def bfs(self, source):
        ''' 
            BFS => Breadth First Search does the level order traversal of the nodes. Queue data structure is used.
            All the cells are accessed in the order of the increasing distance from the source vertex. 
        '''
        q = deque()
        visited = set()
        dirx = [-1,1,0,0]
        diry = [0,0,1,-1]
        planned_path = []
        g = {}
        q.append(source)
        g[source] = 0
        visited.add(source)
        parent = {}
        parent[source] = None
        while(len(q)>0):
            curr = q.popleft()
            self.sum_num_cells_processed = self.sum_num_cells_processed + 1
            if( curr[0] == self.target[0] and curr[1] == self.target[1] ):
                break
            ## For each child present inside the grid, we check whether it is a block.
            ## If the child is not a block, then we update the parent of the child, distance from the source and push the
            ## child in the queue.
            for i in np.arange(4):
                childx = curr[0] + dirx[i]
                childy = curr[1] + diry[i]
                child = (childx, childy)
                if(childx>=0 and childx<self.m and childy>=0 and childy<self.n and self.agent_grid[childx][childy]!='X'):
                    if(child not in visited):
                        visited.add(child)
                        parent[child] = curr
                        g[child] = g[curr] + 1
                        q.append(child)
        if(self.target not in visited):
            return []
        curr = self.target
        while(curr!=source):
            planned_path.append(curr)
            curr = parent[curr]
        planned_path.append(source)
        return planned_path[::-1]
    
    def check_planned_path(self, planned_path):
        dirx = [-1, 1, 0, 0]
        diry = [0, 0, -1, 1]
        n = len(planned_path)
        traversed_path = []
        for i in np.arange(n):
            currx = planned_path[i][0]
            curry = planned_path[i][1]
            if(grid[currx][curry] == 'X'):
                break
            traversed_path.append((currx,curry))
            for i in np.arange(4):
                childx = currx + dirx[i]
                childy = curry + diry[i]
                if(childx>=0 and childx<self.m and childy>=0 and childy<self.n):
                    self.agent_grid[childx][childy] = self.grid[childx][childy]
        return traversed_path
    
    def compute_path(self):
        path = []
        curr = self.start
        while(curr != self.target):
            planned_path = self.bfs(curr)
            if( len(planned_path) == 0 ):
                return []
            traversed_path = self.check_planned_path(planned_path)
            n = len(traversed_path)
            path.append(traversed_path)
            if(traversed_path[n-1] == self.target):
                break
            curr = traversed_path[n-1]
        return path