In [1]:
import math
import heapq
import networkx as nx
from itertools import combinations
import matplotlib.pyplot as plt
import random
import unittest

In [2]:
def aux_path_search(G, source, target, removed):
    heap = [(0, source)]
    distances = {source: 0}
    while heap:
        cost, node = heapq.heappop(heap)
        if node == target:
            return cost
        for neighbor in G.neighbors(node):
            if neighbor == removed:
                continue
            edge_weight = G[node][neighbor].get('weight', 1)
            new_cost = cost + edge_weight
            if new_cost < distances.get(neighbor, math.inf):
                distances[neighbor] = new_cost
                heapq.heappush(heap, (new_cost, neighbor))

    return math.inf

In [3]:
def compute_edge_diff(G, v):
    neighbors = list(G.neighbors(v))
    shortcuts_added = 0
    for i in range(len(neighbors)):
        for j in range(i + 1, len(neighbors)):
            u = neighbors[i]
            w = neighbors[j]
            cost_uv = G[u][v].get('weight') if G.has_edge(u, v) else math.inf
            cost_vw = G[v][w].get('weight') if G.has_edge(v, w) else math.inf
            via_cost = cost_uv + cost_vw
            aux_path_cost = aux_path_search(G, u, w, v)
            if aux_path_cost > via_cost:
                shortcuts_added += 1
    edges_removed = len(neighbors)
    return shortcuts_added - edges_removed, shortcuts_added

In [4]:
def contract_node(G_work, v, current_rank, shortcuts, rank_dict, aux_g):
    neighbors = list(G_work.neighbors(v))
    for u, w in combinations(neighbors, 2):
        if not (G_work.has_edge(u, v) and G_work.has_edge(v, w)):
            continue

        cost_uv = G_work[u][v]['weight']
        cost_vw = G_work[v][w]['weight']
        via_cost = cost_uv + cost_vw
        aux_path_cost = aux_path_search(G_work, u, w, v)
        if aux_path_cost > via_cost:  # Shortcut is needed
            if (u, w) in shortcuts and shortcuts[(u, w)]['weight'] > via_cost:
                aux_g[u][w].update({'weight': via_cost, 'via': v})
                G_work[u][w].update({'weight': via_cost, 'via': v})
                shortcuts[(u, w)]['weight'] = via_cost
                shortcuts[(u, w)]['via'].append(v)
            else:
                shortcuts[(u, w)] = {'weight': via_cost, 'via': [v]}
                aux_g.add_edge(u, w, weight=via_cost, shortcut=True, via=v)
                G_work.add_edge(u, w, weight=via_cost, shortcut=True, via=v)

    rank_dict[v] = current_rank
    G_work.remove_node(v)

In [5]:
def build_contraction_hierarchy_offline(G):
    G_work = G.copy()
    G_aux = G.copy()
    shortcuts = {}
    rank_dict = {}

    edge_diffs = {
        v: (compute_edge_diff(G, v)[0], G.degree(v))  # (shortcut_count, degree)
        for v in G.nodes()
    }
    order = sorted(edge_diffs, key=lambda v: (edge_diffs[v][0], edge_diffs[v][1]))
    current_rank = 0
    for v in order:
        contract_node(G_work, v, current_rank, shortcuts, rank_dict, G_aux)
        current_rank += 1

    return G_aux, rank_dict, order, shortcuts

In [6]:
def build_contraction_hierarchy_online(G):
    G_work = G.copy()
    G_aux = G.copy()
    shortcuts = {}
    rank_dict = {}
    current_rank = 0
    process_order = []

    edge_diffs = {v: (compute_edge_diff(G_work, v)[0], G_work.degree(v)) for v in G_work.nodes()}

    while G_work.nodes():
        # Select the node with the smallest edge difference. Tie-break by node degree.
        v_min = min(edge_diffs, key=lambda v: (edge_diffs[v][0], edge_diffs[v][1]))

        v_neighbors = list(G_work.neighbors(v_min))
        edge_diffs.pop(v_min)

        process_order.append(v_min)
        contract_node(G_work, v_min, current_rank, shortcuts, rank_dict, G_aux)
        current_rank += 1

        # update edge difference dict
        for node in v_neighbors:
            if node in edge_diffs:
                edge_diffs[node] = compute_edge_diff(G_work, node)[0], G_work.degree(node)

    return G_aux, rank_dict, process_order, shortcuts

In [7]:
def bidirectional_dijkstra(aux_g, rank, source, target):
    forward_heap = [(0, source)]
    reverse_heap = [(0, target)]

    forward_dist = {source: 0}
    reverse_dist = {target: 0}
    processed_forward = set()
    processed_reverse = set()

    best_path = math.inf
    meeting_point = None

    while forward_heap or reverse_heap:
        # ------ Forward Search ------
        if len(forward_heap) != 0:
            cost_f, node_f = heapq.heappop(forward_heap)
            if node_f in processed_forward:
                continue
            processed_forward.add(node_f)

            for neighbor in aux_g.neighbors(node_f):
                if rank[node_f] > rank[neighbor]:
                    continue

                weight = aux_g[node_f][neighbor]['weight']
                new_cost = cost_f + weight

                if new_cost < forward_dist.get(neighbor, math.inf):
                    forward_dist[neighbor] = new_cost
                    heapq.heappush(forward_heap, (new_cost, neighbor))

        # ------ Reverse Search ------
        if len(reverse_heap) != 0:
            cost_r, node_r = heapq.heappop(reverse_heap)
            if node_r in processed_reverse:
                continue
            processed_reverse.add(node_r)

            for neighbor in aux_g.neighbors(node_r):
                if rank[node_r] > rank[neighbor]:
                    continue

                weight = aux_g[node_r][neighbor]['weight']
                new_cost = cost_r + weight

                if new_cost < reverse_dist.get(neighbor, math.inf):
                    reverse_dist[neighbor] = new_cost
                    heapq.heappush(reverse_heap, (new_cost, neighbor))

        # Check for Overlap
        common_nodes = processed_forward & processed_reverse
        for node in common_nodes:
            path_cost = forward_dist[node] + reverse_dist[node]
            if path_cost < best_path:
                best_path = path_cost
                meeting_point = node

    return best_path, meeting_point

In [8]:
def generate_random_weighted_graph(num_nodes_min=10, num_nodes_max=30, edge_factor_min=2, edge_factor_max=10,
                                   weight_min=1, weight_max=10):
    """Generates a random graph with random edge weights."""
    num_nodes = random.randint(num_nodes_min, num_nodes_max)
    num_edges = random.randint(num_nodes * edge_factor_min, num_nodes * edge_factor_max)

    G = nx.gnm_random_graph(num_nodes, num_edges)

    # Add random weights to edges
    for u, v in G.edges():
        G.edges[u, v]['weight'] = random.randint(weight_min, weight_max)

    return G

In [22]:
class TestCHDijkstra(unittest.TestCase):
    max_iters = 1000

    def test_offline(self):
        for _ in range(self.max_iters):
            G = generate_random_weighted_graph()
            source = random.choice(list(G.nodes))
            target = random.choice(list(G.nodes))

            try:
                control_sp = nx.shortest_path_length(G, source=source, target=target, weight='weight')
            except nx.exception.NetworkXNoPath:
                print("No possible path from s->t")
                continue # skip this graph

            aux_g_offline, rank_offline, process_order_offline, shortcuts_offline = build_contraction_hierarchy_offline(G)
            offline_path, _ = bidirectional_dijkstra(aux_g_offline, rank_offline, source, target)
            self.assertEqual(offline_path, control_sp, "Offline CH incorrect path length")
            

    def test_online(self):
        for _ in range(self.max_iters):
            G = generate_random_weighted_graph()
            source = random.choice(list(G.nodes))
            target = random.choice(list(G.nodes))

            try:
                control_sp = nx.shortest_path_length(G, source=source, target=target, weight='weight')
            except nx.exception.NetworkXNoPath:
                print("No possible path from s->t")
                continue # skip this graph
                
            aux_g_online, rank_online, process_order_online, shortcuts_online = build_contraction_hierarchy_online(G)
            online_path, _ = bidirectional_dijkstra(aux_g_online, rank_online, source, target)
            self.assertEqual(online_path, control_sp, "Online CH incorrect path length")

In [None]:
unittest.TextTestRunner().run(unittest.defaultTestLoader.loadTestsFromTestCase(TestCHDijkstra))

F