## Generating a random connected graph with 10 million cities designated with start and goal cities

In [4]:
# each city is represented as a 2D (x, y) coordinate 
# thus there is a 1000*1000 grid containing 10 million cities

import random

N, S, E, W = DIRECTIONS = [(0, 1), (0, -1), (1, 0), (-1, 0)]

def Grid(width, height, obstacles=0.2):
    """A 2-D grid, width x height, with obstacles denoting the density of obstacles, chosen at random"""
    grid = {(x, y) for x in range(width) for y in range(height)}
    if isinstance(obstacles, (float, int)):
        obstacles = random.sample(grid, int(width * height * obstacles))
    def neighbors(x, y):
        for (dx, dy) in DIRECTIONS:
            (nx, ny) = (x + dx, y + dy)
            if (nx, ny) not in obstacles and 0 <= nx < width and 0 <= ny < height:
                yield (nx, ny)
    return {(x, y): list(neighbors(x, y))
            for x in range(width) for y in range(height)}

Grid(5, 5)

{(0, 0): [(0, 1)],
 (0, 1): [(0, 2), (1, 1)],
 (0, 2): [(0, 3), (0, 1), (1, 2)],
 (0, 3): [(0, 4), (0, 2), (1, 3)],
 (0, 4): [(0, 3)],
 (1, 0): [(1, 1), (2, 0)],
 (1, 1): [(1, 2), (2, 1), (0, 1)],
 (1, 2): [(1, 3), (1, 1), (0, 2)],
 (1, 3): [(1, 2), (2, 3), (0, 3)],
 (1, 4): [(1, 3), (2, 4), (0, 4)],
 (2, 0): [(2, 1)],
 (2, 1): [(2, 0), (3, 1), (1, 1)],
 (2, 2): [(2, 3), (2, 1), (3, 2), (1, 2)],
 (2, 3): [(2, 4), (3, 3), (1, 3)],
 (2, 4): [(2, 3), (3, 4)],
 (3, 0): [(3, 1), (4, 0), (2, 0)],
 (3, 1): [(3, 2), (4, 1), (2, 1)],
 (3, 2): [(3, 3), (3, 1), (4, 2)],
 (3, 3): [(3, 4), (3, 2), (4, 3), (2, 3)],
 (3, 4): [(3, 3), (4, 4), (2, 4)],
 (4, 0): [(4, 1)],
 (4, 1): [(4, 2), (4, 0), (3, 1)],
 (4, 2): [(4, 3), (4, 1), (3, 2)],
 (4, 3): [(4, 4), (4, 2), (3, 3)],
 (4, 4): [(4, 3), (3, 4)]}

## Breadth first search

In [32]:
def breadth_first_search(problem):
    "Search for goal; paths with least number of steps first."
    if problem.is_goal(problem.initial): 
        return Node(problem.initial)
    frontier = FrontierQ(Node(problem.initial), LIFO=False)
    explored = set()
    while frontier:
        node = frontier.pop()
        explored.add(node.state)
        for action in problem.actions(node.state):
            child = node.child(problem, action)
            if child.state not in explored and child.state not in frontier:
                if problem.is_goal(child.state):
                    return child
                frontier.add(child)

## Uniform Cost Search 

In [41]:
def uniform_cost_search(problem, costfn=lambda node: node.path_cost):
    frontier = FrontierPQ(Node(problem.initial), costfn)
    explored = set()
    while frontier:
        node = frontier.pop()
        if problem.is_goal(node.state):
            return node
        explored.add(node.state)
        for action in problem.actions(node.state):
            child = node.child(problem, action)
            if child.state not in explored and child not in frontier:
                frontier.add(child)
            elif child in frontier and frontier.cost[child] < child.path_cost:
                frontier.replace(child)

## A* Search

In [83]:
def astar_search(problem, heuristic):
    costfn = lambda node: node.path_cost + heuristic(node, problem.goals)
    return uniform_cost_search(problem, costfn)

## Best First Search

In [170]:
def best_first_search(problem, heuristic):
    costfn = lambda node: heuristic(node, problem.goals)
    return uniform_cost_search(problem, costfn)

## Hill Climbing Search 

In [81]:
def hill_climbing(problem):
    """From the initial node, keep choosing the neighbor with highest value,
    stopping when no neighbor is better. [Figure 4.2]"""
    current = Node(problem.initial)
    while True:
        neighbors = current.expand(problem)
        if not neighbors:
            break
        neighbor = argmax_random_tie(neighbors,
                                     key=lambda node: problem.value(node.state))
        if problem.value(neighbor.state) <= problem.value(current.state):
            break
        current = neighbor
    return current.state

## Search Problem

In [26]:
class Problem(object):
    """The abstract class for a search problem."""

    def __init__(self, initial=None, goals=(), **additional_keywords):
        """Provide an initial state and optional goal states.
        A subclass can have additional keyword arguments."""
        self.initial = initial  # The initial state of the problem.
        self.goals = goals      # A collection of possible goal states.
        self.__dict__.update(**additional_keywords)

    def actions(self, state):
        "Return a list of actions executable in this state."
        raise NotImplementedError # Override this!

    def result(self, state, action):
        "The state that results from executing this action in this state."
        raise NotImplementedError # Override this!

    def is_goal(self, state):
        "True if the state is a goal." 
        return state in self.goals # Optionally override this!

    def step_cost(self, state, action, result=None):
        "The cost of taking this action from this state."
        return 1 # Override this if actions have different costs

In [27]:
def action_sequence(node):
    "The sequence of actions to get to this node."
    actions = []
    while node.previous:
        actions.append(node.action)
        node = node.previous
    return actions[::-1]

def state_sequence(node):
    "The sequence of states to get to this node."
    states = [node.state]
    while node.previous:
        node = node.previous
        states.append(node.state)
    return states[::-1]

## Frontiers

In [34]:
from collections import OrderedDict
import heapq

class FrontierQ(OrderedDict):
    "A Frontier that supports FIFO or LIFO Queue ordering."
    
    def __init__(self, initial, LIFO=False):
        """Initialize Frontier with an initial Node.
        If LIFO is True, pop from the end first; otherwise from front first."""
        super(FrontierQ, self).__init__()
        self.LIFO = LIFO
        self.add(initial)
    
    def add(self, node):
        "Add a node to the frontier."
        self[node.state] = node
        
    def pop(self):
        "Remove and return the next Node in the frontier."
        (state, node) = self.popitem(self.LIFO)
        return node
    
    def replace(self, node):
        "Make this node replace the nold node with the same state."
        del self[node.state]
        self.add(node)

In [36]:
class FrontierPQ:
    "A Frontier ordered by a cost function; a Priority Queue."
    
    def __init__(self, initial, costfn=lambda node: node.path_cost):
        "Initialize Frontier with an initial Node, and specify a cost function."
        self.heap   = []
        self.states = {}
        self.costfn = costfn
        self.add(initial)
    
    def add(self, node):
        "Add node to the frontier."
        cost = self.costfn(node)
        heapq.heappush(self.heap, (cost, node))
        self.states[node.state] = node
        
    def pop(self):
        "Remove and return the Node with minimum cost."
        (cost, node) = heapq.heappop(self.heap)
        self.states.pop(node.state, None) # remove state
        return node
    
    def replace(self, node):
        "Make this node replace a previous node with the same state."
        if node.state not in self:
            raise ValueError('{} not there to replace'.format(node.state))
        for (i, (cost, old_node)) in enumerate(self.heap):
            if old_node.state == node.state:
                self.heap[i] = (self.costfn(node), node)
                heapq._siftdown(self.heap, 0, i)
                return

    def __contains__(self, state): return state in self.states
    
    def __len__(self): return len(self.heap)

## Node 

In [39]:
class Node(object):
    """A node in a search tree. A search tree is spanning tree over states.
    A Node contains a state, the previous node in the tree, the action that
    takes us from the previous state to this state, and the path cost to get to 
    this state. If a state is arrived at by two paths, then there are two nodes 
    with the same state."""

    def __init__(self, state, previous=None, action=None, step_cost=1):
        "Create a search tree Node, derived from a previous Node by an action."
        self.state     = state
        self.previous  = previous
        self.action    = action
        self.path_cost = 0 if previous is None else (previous.path_cost + step_cost)

    def __repr__(self): return "<Node {}: {}>".format(self.state, self.path_cost)
    
    def __lt__(self, other): return self.path_cost < other.path_cost
    
    def child(self, problem, action):
        "The Node you get by taking an action from this Node."
        result = problem.result(self.state, action)
        return Node(result, self, action, 
                    problem.step_cost(self.state, action, result))

## Visualization Output

In [155]:
def showpath(searcher, problem):
    "Show what happens when searcvher solves problem."
    problem = Instrumented(problem)
    print('\n{}:'.format(searcher.__name__))
    result = searcher(problem)
    if result:
        actions = action_sequence(result)
        state = problem.initial
        path_cost = 0
        for steps, action in enumerate(actions, 1):
            path_cost += problem.step_cost(state, action, 0)
            result = problem.result(state, action)
            print('  {} =={}==> {}; cost {} after {} steps'
                  .format(state, action, result, path_cost, steps,
                          '; GOAL!' if problem.is_goal(result) else ''))
            state = result
    msg = 'GOAL FOUND' if result else 'no solution'
    print('{} after {} results and {} goal checks'
          .format(msg, problem._counter['result'], problem._counter['is_goal']))
    
def showpath_heuristic(searcher, problem, heuristic):
    "Show what happens when searcvher solves problem."
    problem = Instrumented(problem)
    print('\n{}:'.format(searcher.__name__))
    result = searcher(problem, heuristic)
    if result:
        actions = action_sequence(result)
        state = problem.initial
        path_cost = 0
        for steps, action in enumerate(actions, 1):
            path_cost += problem.step_cost(state, action, 0)
            result = problem.result(state, action)
            print('  {} =={}==> {}; cost {} after {} steps'
                  .format(state, action, result, path_cost, steps,
                          '; GOAL!' if problem.is_goal(result) else ''))
            state = result
    msg = 'GOAL FOUND' if result else 'no solution'
    print('{} after {} results and {} goal checks'
          .format(msg, problem._counter['result'], problem._counter['is_goal']))
    
from collections import Counter

class Instrumented:
    "Instrument an object to count all the attribute accesses in _counter."
    def __init__(self, obj):
        self._object = obj
        self._counter = Counter()
    def __getattr__(self, attr):
        self._counter[attr] += 1
        return getattr(self._object, attr)

## Grid Problem

In [67]:
class GridProblem(Problem):
    "Create with a call like GridProblem(grid=Grid(10, 10), initial=(0, 0), goal=(9, 9))"
    def actions(self, state): return DIRECTIONS
    def result(self, state, action):
        #print('ask for result of', state, action)
        (x, y) = state
        (dx, dy) = action
        r = (x + dx, y + dy)
        return r if r in self.grid[state] else state

In [146]:
gp = GridProblem(grid=Grid(5, 5, 0.3), initial=(0, 0), goals={(4, 4)})

## Applying the search algorithms to the Grid Problem

In [158]:
# %%timeit
showpath(breadth_first_search, gp)


breadth_first_search:
  (0, 0) ==(0, 1)==> (0, 1); cost 1 after 1 steps
  (0, 1) ==(0, 1)==> (0, 2); cost 2 after 2 steps
  (0, 2) ==(0, 1)==> (0, 3); cost 3 after 3 steps
  (0, 3) ==(1, 0)==> (1, 3); cost 4 after 4 steps
  (1, 3) ==(1, 0)==> (2, 3); cost 5 after 5 steps
  (2, 3) ==(1, 0)==> (3, 3); cost 6 after 6 steps
  (3, 3) ==(0, 1)==> (3, 4); cost 7 after 7 steps
  (3, 4) ==(1, 0)==> (4, 4); cost 8 after 8 steps
GOAL FOUND after 63 results and 23 goal checks


In [159]:
showpath(uniform_cost_search, gp)


uniform_cost_search:
  (0, 0) ==(0, 1)==> (0, 1); cost 1 after 1 steps
  (0, 1) ==(0, 1)==> (0, 2); cost 2 after 2 steps
  (0, 2) ==(0, 1)==> (0, 3); cost 3 after 3 steps
  (0, 3) ==(1, 0)==> (1, 3); cost 4 after 4 steps
  (1, 3) ==(1, 0)==> (2, 3); cost 5 after 5 steps
  (2, 3) ==(1, 0)==> (3, 3); cost 6 after 6 steps
  (3, 3) ==(0, 1)==> (3, 4); cost 7 after 7 steps
  (3, 4) ==(1, 0)==> (4, 4); cost 8 after 8 steps
GOAL FOUND after 72 results and 25 goal checks


## Heuristic

In [160]:
def manhattan(node, goals):    
    mhd = 0

    goal = goals.pop()
    goals.add(goal)

    for j in range(2):
        mhd = abs(goal[j] - node.state[j]) + mhd

    return mhd

In [172]:
# astar_search(gp, manhattan)

In [157]:
showpath_heuristic(astar_search, gp, manhattan)


astar_search:
  (0, 0) ==(0, 1)==> (0, 1); cost 1 after 1 steps
  (0, 1) ==(0, 1)==> (0, 2); cost 2 after 2 steps
  (0, 2) ==(0, 1)==> (0, 3); cost 3 after 3 steps
  (0, 3) ==(1, 0)==> (1, 3); cost 4 after 4 steps
  (1, 3) ==(1, 0)==> (2, 3); cost 5 after 5 steps
  (2, 3) ==(1, 0)==> (3, 3); cost 6 after 6 steps
  (3, 3) ==(0, 1)==> (3, 4); cost 7 after 7 steps
  (3, 4) ==(1, 0)==> (4, 4); cost 8 after 8 steps
GOAL FOUND after 68 results and 24 goal checks


In [171]:
showpath_heuristic(best_first_search, gp, manhattan)


best_first_search:
  (0, 0) ==(0, 1)==> (0, 1); cost 1 after 1 steps
  (0, 1) ==(0, 1)==> (0, 2); cost 2 after 2 steps
  (0, 2) ==(0, 1)==> (0, 3); cost 3 after 3 steps
  (0, 3) ==(1, 0)==> (1, 3); cost 4 after 4 steps
  (1, 3) ==(1, 0)==> (2, 3); cost 5 after 5 steps
  (2, 3) ==(1, 0)==> (3, 3); cost 6 after 6 steps
  (3, 3) ==(0, 1)==> (3, 4); cost 7 after 7 steps
  (3, 4) ==(1, 0)==> (4, 4); cost 8 after 8 steps
GOAL FOUND after 48 results and 19 goal checks
