In [1]:
# Imports

import heapq
import os
import pickle
import math


In [2]:
# Custom Priority Queue Class

class PriorityQueue(object):
    

    def __init__(self):

        self.queue = []
        

    def pop(self):
        
        _node = heapq.heappop(self.queue)
        
        if len(_node) == 5:
            node = (_node[0], _node[1], _node[2], _node[4])
            
        elif len(_node) == 3:
            node = (_node[0], _node[2])
            
        elif len(_node) == 2:
            node = _node[1]
            
        else:
            node = _node
        
        return node
    

    def remove(self, node):
        
        self.queue.remove(node)
        
        return self.queue
    

    def append(self, node):
        
        if type(node) == tuple:
            
            if len(node) == 2:
                
                id_ = len(self.queue) + 1
                
                heapq.heappush(self.queue, (node[0], id_, node[1]))
                
            if len(node) == 3:
                
                id_ = len(self.queue) + 1
                
                heapq.heappush(self.queue, (node[0], node[1], id_, node[2]))  
                
            if len(node) == 4:
                
                id_ = len(self.queue) +1
                
                heapq.heappush(self.queue, (node[0], node[1], node[2], id_, node[3]))   
                
        elif type(node) == str:
            
            heapq.heappush(self.queue, node)
            
        else:
            
            heapq.heappush(self.queue, node)
            
        return self.queue
    
    
    def size(self):
        
        return len(self.queue)
    

    def clear(self):

        self.queue = []

        
    def top(self):

        return self.queue[0]
    
    
    def __iter__(self):

        return iter(sorted(self.queue))
    

    def __str__(self):

        return 'PQ:%s' % self.queue
    
        
    def __contains__(self, key):

        return key in [n[-1] for n in self.queue]

    
    def __eq__(self, other):
        
        return self.queue == other.queue

    

Custom Functions

In [None]:
# Heuristics


def null_heuristic(graph, v, goal):

    return 0


def euclidean_dist_heuristic(graph, v, goal):
    
    v_coordinates = graph.nodes[v]['pos']
    
    g_coordinates = graph.nodes[goal]['pos']
    
    dist = ((v_coordinates[0] - g_coordinates[0]) ** 2 + (v_coordinates[1] - g_coordinates[1]) ** 2) ** (0.5)
    
    return dist

In [3]:
# Global class object
frontier = PriorityQueue()

# Global lists: explored (visited), path
exp_= []
path = []

The Search Algorithms

In [None]:
# BFS

def breadth_first_search(graph, start, goal):
    
    if start == goal:
        
        return exp_
    
    else:
        
        frontier.append((0, start))
        
        parent_child = {}
        
        d = 1

        while frontier.size() != 0:

            node = frontier.pop()[1]
            
            exp_.append(node)

            if node == goal:
                
                break
                
            else:
                
                neighbors = [n for n in graph.neighbors(node)]
                
                neighbors = sorted(neighbors)
                
                i = 1
                
                frontier_size = frontier.size()
                
                for neighbor in neighbors:

                    if neighbor not in expl_ and not (any(neighbor in x for x in frontier)):
                        
                        parent_child[(node, i)] = (neighbor, d)     
                        
                        if neighbor == goal:
                            
                            path = bfs_computation(graph, start, goal, parent_child)
                            
                            return path
                        
                        frontier.append((d, neighbor))
                        
                        i+=1
                
                if frontier.size() > frontier_size:
                    
                    d+=1
        
        path = bfs_computation(graph, start, goal, parent_child)
        
    return path

In [None]:
# UCS

def uniform_cost_search(graph, start, goal):
    
    if start == goal:
        return explored
    
    else:
        
        frontier.append((0, start))
        
        parent_child = {}

        while frontier.size() != 0:
            
            node = frontier.pop()

            if node[1] == goal:
                
                path = ucs_computation(graph, start, goal, parent_child)
                
                return path

            exp_.append(node[1])
            
            neighbors = [n for n in graph.neighbors(node[1])]
            
            frontier_size = frontier.size()
            
            i = 1
            
            for neighbor in neighbors:
                
                prior_weight = node[0]
                
                next_weight = graph.get_edge_weight(node[1], neighbor)
                
                weight = prior_weight + next_weight

                if neighbor not in exp_ and not (any(neighbor in x for x in frontier)):
                    
                    frontier.append((weight, neighbor)) 
                    
                    parent_child[(node[1], i)] = (neighbor, weight)
                    
                    i+=1
                    
                else:
                    
                    for f_node in frontier:
                        
                        if neighbor == f_node[2] and weight < f_node[0]:
                            
                            frontier.remove(f_node)
                            
                            frontier.append((weight, neighbor))
                            
                            parent_child[(node[1], i)] = (neighbor, weight)
                            
                            i+=1
            
        path = ucs_computation(graph, start, goal, parent_child)
    
    return path

In [None]:
# A - Star

def a_star(graph, start, goal, heuristic=euclidean_dist_heuristic):
    
    if start == goal:
        
        return exp_
    
    else:
        
        init_dist = heuristic(graph, start, goal)
        
        frontier.append((init_dist, 0, start))
        
        parent_child = {}

        while frontier.size() != 0:
            
            node = frontier.pop()

            if node[3] == goal:
                
                path = astar_computation(graph, start, goal, parent_child, heuristic)
                
                return path

            exp_.append(node[3])
            
            neighbors = [n for n in graph.neighbors(node[3])]
            
            frontier_size = frontier.size()
            
            i = 1
            
            for neighbor in neighbors:
                
                f = graph.get_edge_weight(node[3], neighbor) + node[1]
                
                g = heuristic(graph, neighbor, goal)
                
                dist = f + g

                if neighbor not in explored and not (any(neighbor in x for x in frontier)):
                    
                    frontier.append((dist, f, neighbor)) 
                    
                    parent_child[(node[3], i)] = (neighbor, dist, f)
                    
                    i+=1

                else:
                    
                    for f_node in frontier:
                        
                        if neighbor == f_node[3] and dist < f_node[0]:
                            
                            frontier.remove(f_node)
                            
                            frontier.append((dist, f, neighbor))
                            
                            parent_child[(node[3], i)] = (neighbor, dist, f)
                            
                            i+=1
                            
        path = astar_computation(graph, start, goal, parent_child, heuristic)
        
    return path