# The generic shortest path algorithm.

## Introduction to optimization and operations research.

Michel Bierlaire


In [None]:

from typing import Any

import numpy as np
import pandas as pd
from IPython.core.display_functions import display
from matplotlib import pyplot as plt
from networkx import DiGraph, set_node_attributes
from teaching_optimization.networks import draw_network
from teaching_optimization.networks.shortest_path_algorithm import (
    ShortestPathAlgorithm,
)


The objective of this exercise is to implement and apply the generic shortest path algorithm to identify the shortest paths from a node
to all other nodes.

# Some useful functions

Before implementing the algorithm, we investigate some useful functions of the the_network representation
and illustrate them on an example.

We define a first the_network example.

In [None]:
positions = {
    'a': (0, 0),
    'b': (2, 1.5),
    'c': (2, -1.5),
    'd': (4, 1.5),
    'e': (4, -1.5),
    'f': (6, 0),
}

nodes = list(positions.keys())

arcs = [
    ('a', 'b', -1),
    ('a', 'c', 3),
    ('b', 'd', 7),
    ('b', 'e', 5),
    ('c', 'b', -9),
    ('e', 'c', -7),
    ('d', 'e', 4),
    ('d', 'f', 3),
    ('e', 'f', -2),
]

first_network: DiGraph = DiGraph()
for node in nodes:
    first_network.add_node(node, pos=positions[node])
first_network.add_weighted_edges_from(arcs, weight='cost')
fig, ax = plt.subplots(figsize=(8, 6))
draw_network(the_network=first_network, attr_edge_labels='cost', ax=ax)
plt.show()


Here is how to obtain the list of arcs and associated data (here, the cost).

In [None]:
first_arcs = first_network.edges(data=True)
for arc in first_arcs:
    print(arc)


It means that, in order to access the information about an arc, you need to perform the
following statements.

In [None]:

for arc in first_arcs:
    # For the upstream node
    upstream_node_arc = arc[0]
    # For the downstream node
    downstream_node_arc = arc[1]
    # For the cost
    cost_arc = arc[2]['cost']
    print(f'Cost {upstream_node_arc} -> {downstream_node_arc} = {cost_arc}')


Here is how to obtain the list of nodes and associated data (here, the position).

In [None]:
first_nodes = first_network.nodes(data=True)
for node in first_nodes:
    print(node)


The data is accessed as follows.

In [None]:
for node in first_nodes:
    node_name = node[0]
    node_position = node[1]['pos']
    print(f'Coordinates of {node_name}: {node_position}')


Given a node, we can obtain the list of outgoing arcs.

From node 'a'

In [None]:
outgoing_arcs_from_a = first_network.out_edges('a', data=True)
for arc in outgoing_arcs_from_a:
    print(arc)


From node 'd'

In [None]:
outgoing_arcs_from_d = first_network.out_edges('d', data=True)
for arc in outgoing_arcs_from_d:
    print(arc)



# Shortest path algorithm

Now we implement the shortest path algorithm. Replace the `????`

In [None]:


def shortest_path_algorithm(
    the_network: DiGraph, the_cost: str, the_origin: Any
) -> tuple[dict[Any:float] | None, dict[Any, Any | None] | None, pd.DataFrame]:
    """

    :param the_network: the_network representation
    :param the_cost: name of the cost parameter
    :param the_origin: node at the origin.
    :return:a dict associated each node with their optimal label (or None if problem is unbounded), a dict associating
        each node with its predecessor in the shortest path (or None if the problem is unbounded)
        and a data frame describing the iterations
    """

    # Initialize the labels and the predecessors
    labels = {name: np.inf for name in the_network.nodes}
    labels[the_origin] = 0
    predecessors = {name: None for name in the_network.nodes}
    the_arcs = the_network.edges(data=True)
    # Identify the lowest cost in order to establish a lower bound on the labels. This is used to detect if the
    # problem is unbounded because the the_network contains cycles with negative cost.
    arc_with_lowest_cost = min(the_arcs, key=lambda x: x[2][the_cost])
    lowest_cost = arc_with_lowest_cost[2][the_cost]
    lower_bound = (the_network.number_of_edges() - 1) * lowest_cost

    # Initialize the set of nodes to be treated
    nodes_to_be_treated = {the_origin}

    iteration_number = 0

    reporting_iteration: list[dict[str:Any]] = list()

    # Loop until the set of nodes is empty.
    while nodes_to_be_treated:
        row = {
            'Iteration': iteration_number,
            'Set': str(nodes_to_be_treated),
        }
        # Select the node to be treated during this iteration
        current_node = nodes_to_be_treated.pop()
        row['Node'] = current_node
        for node, label in labels.items():
            row[node] = label
        reporting_iteration.append(row)
        # Consider the list of outgoing arc
        outgoing_arcs = the_network.out_edges(
            current_node, data=True
        )
        for arc in outgoing_arcs:
            upstream_node = arc[0]
            downstream_node = arc[1]
            cost = arc[2][the_cost]
            # Check if the label must be updated
            if (
                labels[downstream_node] > labels[upstream_node] + cost
            ):
                # Update the label
                labels[downstream_node] = (
                    labels[upstream_node] + cost
                )
                # Update the predecessor
                predecessors[
                    downstream_node
                ] = upstream_node
                # Check for a cycle with negative cost
                if (
                    labels[downstream_node] < 0
                    and labels[downstream_node] < lower_bound
                ):
                    print('The the_network contains a cycle with negative cost.')
                    return None, None, pd.DataFrame(reporting_iteration)
                # Update the set of nodes to be treated
                nodes_to_be_treated.add(
                    downstream_node
                )
        iteration_number += 1

    row = {'Iteration': iteration_number, 'Set': '{}', 'Node': ''}
    for node, label in labels.items():
        row[node] = label
    reporting_iteration.append(row)
    return labels, predecessors, pd.DataFrame(reporting_iteration)



# First example
We run the algorithm on the example above.

In [None]:
optimal_labels, predecessors, iterations = shortest_path_algorithm(
    the_network=first_network, the_cost='cost', the_origin='a'
)


Optimal labels
If None, it means that the the_network contains a cycle with negative cost.
It is the case with this example.

In [None]:
display(optimal_labels)


Description of the iterations

In [None]:
display(iterations)


# Second example

In [None]:
positions = {
    'a': (0, 1),
    'b': (1, 2),
    'c': (1, 0),
    'd': (2, 2),
    'e': (3, 1),
    'f': (2, 0),
    'g': (4, 2),
    'h': (4, 0),
    'i': (5, 1),
}

nodes = list(positions.keys())

arcs = [
    ('a', 'b', 10),
    ('a', 'c', 12),
    ('b', 'd', -12),
    ('b', 'f', 4),
    ('c', 'd', 7),
    ('c', 'b', 8),
    ('c', 'f', 6),
    ('d', 'g', 16),
    ('d', 'e', -3),
    ('f', 'd', 7),
    ('e', 'g', 7),
    ('e', 'i', -1),
    ('e', 'h', 6),
    ('e', 'f', -4),
    ('f', 'h', 15),
    ('g', 'i', 8),
    ('h', 'i', 5),
]



We plot the the_network

In [None]:
second_network: DiGraph = DiGraph()
for node in nodes:
    second_network.add_node(node, pos=positions[node])
second_network.add_weighted_edges_from(arcs, weight='cost')
fig, ax = plt.subplots(figsize=(8, 6))
draw_network(the_network=second_network, attr_edge_labels='cost', ax=ax)
plt.show()


In [None]:
optimal_labels, predecessors, iterations = shortest_path_algorithm(
    the_network=second_network, the_cost='cost', the_origin='a'
)


Optimal labels
If None, it means that the the_network contains a cycle with negative cost.

In [None]:
display(optimal_labels)


Predecessors

In [None]:
display(predecessors)


We add the optimal labels as attributes of the nodes

In [None]:
if optimal_labels is not None:
    set_node_attributes(second_network, optimal_labels, 'label')


Description of the iterations

In [None]:
display(iterations)



We now write a recursive function to identify the shortest paths

In [None]:
def shortest_path(node: Any, the_predecessors: dict[Any, Any | None]) -> str:
    """Print the shortest path to a given node, recursively"""

    # First case: if there is  no predecessor, it means that the path is the node itself.
    if the_predecessors[node] is None:
        return str(node)

    # If there is a predecessor, with merge the path to the predecessor with the current node.
    return f'{shortest_path(node=the_predecessors[node], the_predecessors=the_predecessors)} -> {str(node)}'



Print the shortest paths

In [None]:
for node in second_network.nodes:
    print(shortest_path(node=node, the_predecessors=predecessors))


We create and plot the shortest path graph

In [None]:
shortest_path_arcs = [
    (upstream, downstream)
    for downstream, upstream in predecessors.items()
    if upstream is not None
]

shortest_path_tree: DiGraph = DiGraph()
for node in nodes:
    shortest_path_tree.add_node(node, pos=positions[node])
shortest_path_tree.add_edges_from(shortest_path_arcs)
fig, ax = plt.subplots(figsize=(8, 6))
draw_network(the_network=shortest_path_tree, ax=ax)
plt.show()


For future exercises, it is possible to perform the same tasks using the package.

Initialization

In [None]:
the_algorithm = ShortestPathAlgorithm(
    the_network=second_network, the_cost_name='cost', the_origin='a'
)


Running the algorithm

In [None]:
the_algorithm.shortest_path_algorithm()


Printing the shortest paths

In [None]:
the_shortest_paths = the_algorithm.list_of_shortest_paths()


Plotting the results

In [None]:
the_algorithm.plot_shortest_path_tree()