In [52]:
# wrapper for test methods for catching exceptions

def testWrap(method):
    mname = method.__name__
    def wrapper(*args, **kw):
        try:
            result = method(*args, **kw)
        except:
            from traceback import print_exc
            print ("{} ran into exception\n".format(method.__name__))
            print_exc()
            return False
        return result
    return wrapper


In [53]:
# implementation of a fifo queue using a singly linked list
# The nodes do have a prev pointer that can be used to extend
# them to a doubly linked list (if needed for some other application)

class Queue(object):
    def __init__(self, items=()):
        self.head = None
        self.last = None
        self.length = 0

        for item in items:
            self.add(item)

    def add(self, el):
        self.length += 1
        newnode = Node(el)
        if self.last:
            self.last.next = newnode
        else:
            self.head = newnode
        newnode.next = None
        self.last = newnode

    def remove(self):
        if self.head is None:
            raise Exception('Cannot remove from empty queue')
        self.length -= 1
        first, second = self.head, self.head.next
        if second is None:
            self.head = self.last = None
        else:
            self.head = second
        return first.data

    def __str__(self):
        if self.head is None:
            return "-Queue Empty-"

        curr = self.head
        elements = []
        while curr != None:
            elements.append(str(curr.data))
            curr = curr.next
        return "[Queue: {}]".format(", ".join(elements))

    def __len__(self):
        return self.length

    def __iter__(self):
        return self

    def __next__(self):
        if self.length:
            return self.remove()
        raise StopIteration


class Node(object):
    def __init__(self, data):
        self.data = data
        self.next = None
        self.prev = None

In [54]:
@testWrap
def testQueue():
    q = Queue()
    q.add(10)
    q.add(20)
    r = q.remove()
    assert(r == 10)
    q.add(30)
    q.add(40)
    r = q.remove()
    assert(r == 20)
    assert(len(q) == 2)
    assert([r for r in q] == [30, 40])
    return True


result = testQueue()
if result:
    print('Queue: Test Passed')

Queue: Test Passed


In [64]:
def bfsPath(G, n1, n2):
    if n1 == n2:
        return [n1]
    
    nodes = G.nodes()
    numNodes = len(nodes)
    previous = {x:None for x in nodes}
    queue = Queue([nodes[n1]])

    done = False
    for node1 in queue:
        for node2 in G.neighbors(node1):
            if not previous[node2]:
                queue.add(node2)
                previous[node2] = node1
                if node2 == n2:
                    done = True
                    break
        if done:
            break;

    curr = n2
    path = [curr]
    while curr != n1 :
        curr = previous[curr]
        if curr is None:
            return None
        path.append(curr)
    
    return list(reversed(path))


In [65]:
import networkx as nx
import random

@testWrap
def testGraphPathSearch():
    # create a random graph
    numNodes = 500
    G = nx.gnp_random_graph(numNodes,0.01)

    n1, n2 = [random.randrange(0, numNodes) for x in range(2)]

    # call bfs to get the shortest path
    try:
        myPath = bfsPath(G, n1, n2)
    except:
        #print("bfsPath did not find a path")
        myPath = None

    # get path from known method
    try:
        nxPath = nx.shortest_path(G, n1, n2)
    except:
        #print("nx did not find a path")
        nxPath = None

    if myPath == nxPath:
        #print ("BFS = {}\nNX = {}".format(myPath, nxPath))
        #print ("Test passed")
        return True
    if not None in (myPath, nxPath) and len(myPath) == len(nxPath):
        #print ("BFS = {}\nNX = {}".format(myPath, nxPath))
        #print ("Test passed")
        return True

    print ("BFS = {}\nNX = {}".format(myPath, nxPath))
    print ("*********** Test FAILED ***********")
    return False

numTests = 100
numPassed = sum((1 if testGraphPathSearch() else 0) for x in range(numTests))
print ("{} of {} tests PASSED".format(numPassed, numTests))

100 of 100 tests PASSED


In [None]:
import random
import networkx as nx
G = nx.gnp_random_graph(300,0.009)
n1, n2 = random.randrange(0, 300), random.randrange(0, 300)
print(n1, n2)
print("shortest path = {}".format(nx.shortest_path(G, n1, n2)))

import matplotlib.pyplot as plt
import pylab
pylab.rcParams['figure.figsize'] = (10.0, 8.0)
plt.title("Degree rank plot")

xx = 0.44
plt.axes([xx,xx,xx,xx])
Gcc=sorted(nx.connected_component_subgraphs(G), key = len, reverse=True)[0]
pos=nx.spring_layout(Gcc)
plt.axis('off')
nx.draw_networkx_nodes(Gcc,pos,node_size=20)
nx.draw_networkx_edges(Gcc,pos,alpha=0.4)

plt.show()