# Dijkstra's algorithm

https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm


(city, ([(neighbor, weight)], current_distance, visited, path))

In [97]:
import networkx as nx
import random
import string
from pyspark import SparkContext

---
### **Utils definition**

**Printing**

In [98]:
def print_initial_state(prepared_data):
    """
    Prints a formatted table showing the graph structure and initial
    values before Spark starts the Dijkstra algorithm.
    """
    # Header with aligned columns
    print(f"\n{'='*95}")
    print(f"{'NODE':<6} | {'INIT DIST':<10} | {'VISITED':<10} | {'NEIGHBORS (DESTINATION, WEIGHT)'}")
    print(f"{'-'*95}")

    # Iterate through sorted data for a clean display
    for node, (neighbors, dist, visited, path) in sorted(prepared_data):
        dist_str = "Inf" if dist == float('inf') else f"{dist:.1f}"
        neighbors_str = ", ".join([f"({v}, {p})" for v, p in neighbors]) if neighbors else "None"
        print(f"{node:<6} | {dist_str:<10} | {str(visited):<10} | {neighbors_str}")

    print(f"{'='*95}\n")

In [99]:
def print_final_results(results):
    """
    Prints a formatted table showing the shortest distance and
    the optimal path for each node after the algorithm has finished.
    """
    print(f"\n{'='*95}")
    print(f"{'NODE':<10} | {'DISTANCE':<12} | {'OPTIMAL PATH'}")
    print(f"{'-'*95}")

    # Sorting ensures nodes appear as A, B, C...
    for id_n, (_, dist, _, path) in sorted(results):
        dist_str = "Inf" if dist == float('inf') else f"{dist:.1f}"
        path_str = " -> ".join(path) if path else "N/A"
        print(f"{id_n:<10} | {dist_str:<12} | {path_str}")

    print(f"{'='*95}\n")

**Graph Generation**

In [100]:
# Example graph
def example_graph():
    """
    Create an example directed graph with weighted edges.
    """
    G = nx.DiGraph()
    G.add_weighted_edges_from([
      ("A", "B", 3.0), ("A", "C", 10.0), ("A", "E", 4.0),
      ("B", "C", 2.0), ("B", "D", 8.0),  ("B", "F", 7.0),
      ("C", "D", 5.0), ("C", "G", 3.0),
      ("D", "H", 6.0),
      ("E", "F", 2.0), ("E", "I", 9.0),
      ("F", "G", 1.0), ("F", "J", 5.0),
      ("G", "H", 2.0), ("G", "K", 4.0),
      ("I", "J", 3.0),
      ("J", "K", 6.0)
    ])
    return G

# Data Preprocessing for Graph
def data_preprocesing(G, initial_node):
    """
    Prepare the graph data for Spark.
    """

    pyspark_graph = []

    for node in G.nodes():
        neighbors = [(nbr, G.edges[node, nbr]["weight"]) for nbr in G.successors(node)]
        if node == initial_node:
            weight = 0
        else:
            weight = float("inf")
        pyspark_graph.append(
            (node, (neighbors, weight, False, [initial_node] if node == initial_node else []))
        )
    return pyspark_graph

In [101]:
# Random Graph Generator
def generate_random_graph(n, prob, weight_range, seed=42):
    """
    Create a random directed graph with weighted edges.
    """
    graph = nx.fast_gnp_random_graph(n, prob, directed=True, seed=seed)

    # Mapping node name to letters
    leters = list(string.ascii_uppercase)
    mapping = {i: leters[i] for i in range(n)}
    graph = nx.relabel_nodes(graph, mapping)

    for u, v in graph.edges():
        graph.edges[u, v]['weight'] = float(random.randint(*weight_range))

    return graph

Interpretation:
('A', ([('B', 3.0), ('C', 10.0), ('E', 4.0)], inf, False, []))

Cost
A -> B 3€
  -> E 4€
  -> C 10€

inf -> Value 0

**DIJKSTRA Algorithm**

In [102]:
from networkx.classes.function import neighbors
# DIJKSTRA Algorithm
sc = SparkContext.getOrCreate()

def dijkstra_spark(initial_data):
    rdd = sc.parallelize(initial_data)
    num_nodes = len(initial_data)

    for _ in range(num_nodes):

        # Search actual minimum
        unvisited_nodes = rdd.filter(lambda x: not x[1][2] and x[1][1] != float('inf'))

        if unvisited_nodes.isEmpty():
            break  # all nodes have been visited

        actual = unvisited_nodes.min(key=lambda x: x[1][1])
        id_act, (vecinos_act, dist_act, _, path_act) = actual

        # Update RDD
        def update_node(node_data):
            id_n, (neighbors, dist, visited, path) = node_data

            # Set node as visited
            if id_n == id_act:
                return (id_n, (neighbors, dist, True, path))

            # Search for minimum weight in path and set
            for nbr, weight in vecinos_act:
                if id_n == nbr:
                    nueva_dist = dist_act + weight
                    if nueva_dist < dist:
                        return (id_n, (neighbors, nueva_dist, visited, path_act + [id_n]))

            return node_data

        rdd = rdd.map(update_node)

        # Cache managing to prevent failure
        rdd.cache()
        rdd.count()

    return rdd.collect()

---
### **Main Execution**

In [103]:
# Generate Graph
grafo_nx = generate_random_graph(n=10, prob=0.2, weight_range=(1, 15))
pyspark_data = data_preprocesing(grafo_nx, "A")

# Print Initial
print_initial_state(pyspark_data)

# Solve
results = dijkstra_spark(pyspark_data)

# Result
print_final_results(results)


NODE   | INIT DIST  | VISITED    | NEIGHBORS (DESTINATION, WEIGHT)
-----------------------------------------------------------------------------------------------
A      | 0.0        | False      | (G, 13.0), (H, 12.0), (J, 4.0)
B      | Inf        | False      | (D, 4.0), (E, 12.0)
C      | Inf        | False      | (D, 4.0), (J, 3.0)
D      | Inf        | False      | (E, 1.0), (I, 2.0), (A, 11.0), (C, 9.0)
E      | Inf        | False      | (I, 11.0), (D, 7.0)
F      | Inf        | False      | None
G      | Inf        | False      | (J, 5.0), (C, 8.0), (D, 9.0)
H      | Inf        | False      | (I, 12.0), (J, 10.0), (F, 14.0)
I      | Inf        | False      | (J, 15.0), (E, 10.0), (G, 14.0), (H, 6.0)
J      | Inf        | False      | None


NODE       | DISTANCE     | OPTIMAL PATH
-----------------------------------------------------------------------------------------------
A          | 0.0          | A
B          | Inf          | N/A
C          | 21.0         | A -> G -> C
D 

In [104]:
# Charge Exemple Graph
grafo_ejemplo_nx = example_graph()
pyspark_example_data = data_preprocesing(grafo_ejemplo_nx, "A")

# Print Initial
print_initial_state(pyspark_example_data)

# Solve
results = dijkstra_spark(pyspark_example_data)


# Result
print_final_results(results)


NODE   | INIT DIST  | VISITED    | NEIGHBORS (DESTINATION, WEIGHT)
-----------------------------------------------------------------------------------------------
A      | 0.0        | False      | (B, 3.0), (C, 10.0), (E, 4.0)
B      | Inf        | False      | (C, 2.0), (D, 8.0), (F, 7.0)
C      | Inf        | False      | (D, 5.0), (G, 3.0)
D      | Inf        | False      | (H, 6.0)
E      | Inf        | False      | (F, 2.0), (I, 9.0)
F      | Inf        | False      | (G, 1.0), (J, 5.0)
G      | Inf        | False      | (H, 2.0), (K, 4.0)
H      | Inf        | False      | None
I      | Inf        | False      | (J, 3.0)
J      | Inf        | False      | (K, 6.0)
K      | Inf        | False      | None


NODE       | DISTANCE     | OPTIMAL PATH
-----------------------------------------------------------------------------------------------
A          | 0.0          | A
B          | 3.0          | A -> B
C          | 5.0          | A -> B -> C
D          | 10.0         | A -> B 