In [None]:
import math
from collections import defaultdict, deque, Counter
from queue import PriorityQueue,Queue
FIFOQueue = deque
LIFOQueue = list

class Node:
    "A Node in a search tree."
    def __init__(self, state, parent=None, action=None, path_cost=0):
        self.__dict__.update(state=state, parent=parent, action=action, path_cost=path_cost)

    def __repr__(self): return '{}'.format(self.state) # represent the path state
    def __len__(self): return 0 if self.parent is None else (1 + len(self.parent))
    def __lt__(self, other): return self.path_cost < other.path_cost

    
    
failure = Node('failure', path_cost=math.inf) # Indicates an algorithm couldn't find a solution.
cutoff  = Node('cutoff',  path_cost=math.inf) # Indicates iterative deepening search was cut off.



    
    
def expand(problem, node):  
    s = node.state
    for action in problem.actions(s):
        s1 = problem.result(s, action)
        cost = node.path_cost + problem.action_cost(s, action, s1)
        yield Node(s1, node, action, cost)
        

: 

In [3]:
import pandas as pd

# Map data
edges_data = {
    'Start_Node': ['O', 'O', 'A', 'A', 'A', 'L', 'L', 'D', 'C', 'C', 'C', 'R', 'F', 'B', 'B', 'B', 'B', 'H', 'E', 'U', 'I', 'I', 'P'],
    'End_Node': ['Z', 'S', 'Z', 'S', 'T', 'T', 'M', 'M', 'D', 'R', 'P', 'S', 'S', 'F', 'P', 'G', 'U', 'U', 'H', 'V', 'V', 'N', 'R'],
    'Cost': [71, 151, 75, 140, 118, 111, 70, 75, 120, 146, 138, 80, 99, 211, 101, 90, 85, 98, 86, 142, 92, 87, 97]
}

edges_df = pd.DataFrame(edges_data)

print("Edges DataFrame:")
print(edges_df)



Edges DataFrame:
   Start_Node End_Node  Cost
0           O        Z    71
1           O        S   151
2           A        Z    75
3           A        S   140
4           A        T   118
5           L        T   111
6           L        M    70
7           D        M    75
8           C        D   120
9           C        R   146
10          C        P   138
11          R        S    80
12          F        S    99
13          B        F   211
14          B        P   101
15          B        G    90
16          B        U    85
17          H        U    98
18          E        H    86
19          U        V   142
20          I        V    92
21          I        N    87
22          P        R    97


In [4]:
# The pandas version

class Map:
    """Bản đồ của các địa điểm: một đồ thị với các đỉnh và liên kết giữa chúng. 
    Trong `Map(links, locations)`, `links` là DataFrame có cột 'Start_Node', 'End_Node' và 'Cost'. 
    Nếu `directed=False` thì đối với mỗi liên kết (v1, v2), chúng ta thêm một liên kết (v2, v1)."""

    def __init__(self, links, directed=False):
        if isinstance(links, pd.DataFrame):
            links_dict = {(row['Start_Node'], row['End_Node']): row['Cost'] for _, row in links.iterrows()}  
        if not directed:
            links_dict = {**links_dict, **{(v2, v1): cost for (v1, v2), cost in links_dict.items()}}
        
        
        self.distances = links_dict
        self.neighbors = self.create_neighbors(links_dict)
        
            
    def create_neighbors(self, links):
        """Given (key, val) pairs, make a dict of {key: [val,...]}."""
        result = defaultdict(list)
        for (v1, v2) ,_ in links.items():
            result[v1].append(v2)       
        return result


In [5]:
map_object = Map(edges_df,False)

In [234]:
map_object.distances['A', 'Z']

75

In [6]:
class RomaniaProblem:
    """A problem to find a route between locations on a `Map`.
    Create a problem with RomaniaProblem(start, goal, map=Map(...)})."""

    def __init__(self, initial=None, goal=None, map=None):
        self.initial = initial
        self.goal = goal
        self.map = map

    def actions(self, state):
        """The places neighboring `state`."""
        return self.map.neighbors[state]

    def result(self, state, action):
        """Go to the `action` place, if it possible."""
        return action if action in self.map.neighbors[state] else state

    def is_goal(self, state):
        """Check if the state is the goal state."""
        return state == self.goal

    def action_cost(self, s, action, s1):
        """The distance (cost) to go from s to s1."""
        return self.map.distances[s, s1]


    def __str__(self):
        return '{}({!r}, {!r})'.format(type(self).__name__, self.initial, self.goal)

    

In [7]:
problem = RomaniaProblem('A', 'B', map=map_object)

# Bidi Search 


start from two side -> use BFS to search till come meet each other ?

In [201]:
import copy

class CountCalls:
    """Delegate all attribute gets to the object, and count them in ._counts"""
    def __init__(self, obj):
        self._object = obj
        self._counts = Counter()
        
    def __getattr__(self, attr):
        "Delegate to the original object, after incrementing a counter."
        self._counts[attr] += 1
        return getattr(self._object, attr)

def path_cost(n): return n.path_cost
def inverse_problem(problem):
    if isinstance(problem, CountCalls):
        return CountCalls(inverse_problem(problem._object))
    else:
        inv = copy.copy(problem)
        inv.initial, inv.goal = inv.goal, inv.initial
        return inv

In [241]:
"""Additional function for solving Bidi Search"""
def join_nodes(nf, nb):
    """Join the reverse of the backward node nb to the forward node nf."""
    #print('join', S(nf), S(nb))
    join = nf
    while nb.parent is not None:
        cost = join.path_cost + nb.path_cost - nb.parent.path_cost
        join = Node(nb.parent.state, join, nb.action, cost)
        nb = nb.parent
        def S(node):
            return str(path_states(node))
        print('  now join', S(join), 'with nb', S(nb), 'parent', S(nb.parent))
    return join
def path_states(node):
    "The sequence of states to get to this node."
    if node in (cutoff, failure, None): 
        return []
    return path_states(node.parent) + [node.state]

def proceed(direction, problem, frontier, reached, reached2, solution):
    node = frontier.get()
    for child in expand(problem, node):
        s = child.state
        print('proceed', direction, path_states(child))
        if s not in reached or child.path_cost < reached[s].path_cost:
            reached[s] = child
            frontier.put(child)
              
            if s in reached2: # Frontiers collide; solution found
                solution2 = (join_nodes(child, reached2[s]) if direction == 'f' else
                             join_nodes(reached2[s], child))
                if solution2.path_cost < solution.path_cost:
                    solution = solution2
    return solution
def terminated(solution, frontier_f, frontier_b):
        n_f, n_b = frontier_f.queue[0], frontier_b.queue[0]
        return path_cost(n_f) + path_cost(n_b) > path_cost(solution)

In [242]:

from queue import PriorityQueue

def bidirectional_best_first_search(problem_f, f_f, problem_b, f_b, terminated):
    """
    BIBF-SEARCH( problemF , fF , problemB , fB ) returns a solution node, or failure
    
    nodeF ← NODE(problemF.INITIAL)  # Node for a start state
    
    nodeB ← NODE(problemB.INITIAL)  # Node for a goal state
    
    frontierF ← a priority queue ordered by fF, with nodeF as an element
    
    frontierB ← a priority queue ordered by fB, with nodeB as an element
    
    reachedF ← a lookup table, with one key nodeF.STATE and value nodeF
    
    reachedB ← a lookup table, with one key nodeB.STATE and value nodeB
    
    solution ← failure

    while not TERMINATED(solution, frontierF, frontierB):
        if fF(TOP(frontierF)) < fB(TOP(frontierB)):
            solution = PROCEED('f', problemF, frontierF, reachedF, reachedB, solution)
        else:
            solution = PROCEED('b', problemB, frontierB, reachedB, reachedF, solution)

    return solution"""
    node_f = Node(problem_f.initial)
    node_b = Node(problem_f.goal)
    frontier_f = PriorityQueue()
    frontier_b = PriorityQueue()
    frontier_f.put(node_f)
    frontier_b.put(node_b)
    reached_f = {problem_f.initial: node_f}
    reached_b = {problem_b.initial: node_b}
    solution = failure

    while frontier_f and frontier_b and not terminated(solution, frontier_f, frontier_b):
        def S1(node, f):
            return str(int(f(node))) + ' ' + str(path_states(node))
        print('Bi:', S1(frontier_f.queue[0], f_f), S1(frontier_b.queue[0], f_b))
        if f_f(frontier_f.queue[0]) < f_b(frontier_b.queue[0]):
            solution = proceed('f', problem_f, frontier_f, reached_f, reached_b, solution)
        else:
            solution = proceed('inverse f', problem_b, frontier_b, reached_b, reached_f, solution)
    return solution
def bidi_search(problem):
    return bidirectional_best_first_search(problem, path_cost, inverse_problem(problem), path_cost, terminated)



In [225]:
def report(searchers, problems):
    """Summary report"""
    for searcher in searchers:
        print(searcher.__name__ + ':')
        total_counts = Counter()
        for p in problems:
            prob   = CountCalls(p)
            soln   = searcher(prob)
            counts = prob._counts; 
            counts.update(actions=len(soln), cost=soln.path_cost)
            total_counts += counts
            report_counts(counts, str(p)[:40])
        
def report_counts(counts, name):
    """Print one line of the counts report."""
    print('{} | {:9,d} nodes |{:9,d} goal |{:5.0f} Total path cost |{:8,d} Node visited | {}\n'.format("Sucess" if counts['cost'] != math.inf else "Fail",
          counts['result'], counts['is_goal'], counts['cost'], counts['actions'], name))

In [243]:
report([bidi_search], [problem])

bidi_search:
Bi: 0 ['A'] 0 ['B']
proceed inverse f ['B', 'F']
proceed inverse f ['B', 'P']
proceed inverse f ['B', 'G']
proceed inverse f ['B', 'U']
Bi: 0 ['A'] 85 ['B', 'U']
proceed f ['A', 'Z']
proceed f ['A', 'S']
proceed f ['A', 'T']
Bi: 75 ['A', 'Z'] 85 ['B', 'U']
proceed f ['A', 'Z', 'O']
proceed f ['A', 'Z', 'A']
Bi: 118 ['A', 'T'] 85 ['B', 'U']
proceed inverse f ['B', 'U', 'V']
proceed inverse f ['B', 'U', 'B']
proceed inverse f ['B', 'U', 'H']
Bi: 118 ['A', 'T'] 90 ['B', 'G']
proceed inverse f ['B', 'G', 'B']
Bi: 118 ['A', 'T'] 101 ['B', 'P']
proceed inverse f ['B', 'P', 'R']
proceed inverse f ['B', 'P', 'C']
proceed inverse f ['B', 'P', 'B']
Bi: 118 ['A', 'T'] 183 ['B', 'U', 'H']
proceed f ['A', 'T', 'A']
proceed f ['A', 'T', 'L']
Bi: 140 ['A', 'S'] 183 ['B', 'U', 'H']
proceed f ['A', 'S', 'O']
proceed f ['A', 'S', 'A']
proceed f ['A', 'S', 'R']
  now join ['A', 'S', 'R', 'P'] with nb ['B', 'P'] parent ['B']
  now join ['A', 'S', 'R', 'P', 'B'] with nb ['B'] parent []
proceed

In [245]:
def out_solution(algorithms, problems):
    for algorithm in algorithms:
        for problem in problems:
            print(algorithm.__name__ + ':')
            result = algorithm(problem)
            if isinstance(result, tuple):
                solution_node = result[0]
            else:
                solution_node = result
            
            if solution_node:
                path = []
                while solution_node:
                    path.insert(0, solution_node.state)
                    solution_node = solution_node.parent

                path.insert(0, problem.initial)
                print("Solution Path from {}: {}".format(problem.initial, ' -> '.join(path)))
                print('+' * 50+ '\n')
            else:
                print("No solution found.")
                print('+' * 50+ '\n')



In [246]:
out_solution([breadth_first_search,uniform_cost_search,depth_first_search,depth_limited_search,iterative_deepening_search,bidi_search], [problem])

breadth_first_search:
The set of frontier : deque([A])
Visited: {'A'}
The set of frontier : deque([Z, S, T])
Visited: {'Z', 'T', 'A', 'S'}
The set of frontier : deque([S, T, O])
Visited: {'O', 'A', 'T', 'S', 'Z'}
The set of frontier : deque([T, O, R, F])
Visited: {'O', 'R', 'A', 'T', 'S', 'Z', 'F'}
The set of frontier : deque([O, R, F, L])
Visited: {'O', 'R', 'A', 'T', 'S', 'L', 'Z', 'F'}
The set of frontier : deque([R, F, L])
Visited: {'O', 'R', 'A', 'T', 'S', 'L', 'Z', 'F'}
The set of frontier : deque([F, L, C, P])
Visited: {'P', 'O', 'C', 'R', 'A', 'T', 'S', 'L', 'Z', 'F'}
Solution Path from A: A -> S -> F -> B
++++++++++++++++++++++++++++++++++++++++++++++++++

uniform_cost_search:
The set of frontier : []
Visited: dict_keys(['A'])
The set of frontier : [(118, T), (140, S)]
Visited: dict_keys(['A', 'Z', 'S', 'T'])
The set of frontier : [(140, S), (146, O)]
Visited: dict_keys(['A', 'Z', 'S', 'T', 'O'])
The set of frontier : [(146, O), (229, L)]
Visited: dict_keys(['A', 'Z', 'S', 'T'