In [1]:

# This file revises Algorithm 8 (warm-start calculation of all shortest paths)
# for solving the K shortest path problem.
# See https://en.wikipedia.org/wiki/K_shortest_path_routing

# The k shortest path problem is a generalization of the shortest path problem. 
# It asks not only about a shortest path but also about next k−1 shortest paths
# (which may be longer than the shortest path).


In [2]:
exec(open("./funcs/tool_funcs.py").read())



In [3]:
def dijkstra_show_path(adj_matrix, start, end):
 
    n = len(adj_matrix)
    distances = [float('inf')] * n
    previous_nodes = [-1] * n
    distances[start] = 0
    priority_queue = [(0, start)]  # (distance, node)

    while priority_queue:
        current_distance, current_node = heapq.heappop(priority_queue)

        # Stop if we reached the destination node
        if current_node == end:
            break

        # If the distance is no longer optimal, skip
        if current_distance > distances[current_node]:
            continue

        # Explore neighbors
        for neighbor, weight in enumerate(adj_matrix[current_node]):
            if weight > 0:  # Only consider edges with weight > 0
                distance = current_distance + weight

                # Update distance if it's better
                if distance < distances[neighbor]:
                    distances[neighbor] = distance
                    previous_nodes[neighbor] = current_node
                    heapq.heappush(priority_queue, (distance, neighbor))

    # Reconstruct the path from end to start
    path = []
    current = end
    while current != -1:
        path.append(current)
        current = previous_nodes[current]
    path.reverse()

    # If the start node isn't reachable, return an empty path
    return path if path[0] == start else []

In [4]:
def translate_path(path, candidate_node_list):
    new_path = [candidate_node_list[i] for i in path]
    return new_path




In [5]:
def output_small_matrix(X_distance_matrix, candidate_node_list):
    small_matrix = np.zeros((len(candidate_node_list), len(candidate_node_list)))
    for i, ii in enumerate(candidate_node_list):
        for j, jj in enumerate(candidate_node_list):
            small_matrix[i,j] = X_distance_matrix[ii, jj]
    return small_matrix




In [6]:
# APPD stands for all points path distance (APPD).

def check_shortest_path(i,j, X_distance_matrix, APPD_matrix, path):
    temp1= path[:-1]
    temp2= path[1:]
    llll = [X_distance_matrix[temp1[i],temp2[i]] for i in range(len(temp1))]
    ooo = np.sum(llll)
    return ooo == APPD_matrix[i,j]

In [7]:
def compare_two_list(a, b):
    return len(a)==len(b) and len(a)==sum([1 for i,j in zip(a,b) if i==j])
 

def check_if_temp_path_already_in_not_using_set(candidate_paths_list, temp_path):
#     aa = set(temp_path)
#     for bb in candidate_paths_list:
#         bb = set(bb)
#         if aa == bb:
#             return True
    for bb in candidate_paths_list:
        if compare_two_list(temp_path, bb):
            return True
    return False


In [8]:
def cal_path_length(X_distance_matrix, path):
    temp1= path[:-1]
    temp2= path[1:]
    llll = [X_distance_matrix[temp1[i],temp2[i]] for i in range(len(temp1))]
    ooo = np.sum(llll)
    return ooo

In [9]:
def cal_candidate_node_list_slack(i, j, shortest_path_matrix, X_distance_matrix, threshold):
    
    N = len(X_distance_matrix)
    
    remaining_list = [k for k in np.arange(N) if k != i and k != j]
    candidate_node_list = []
    candidate_node_list.append(i)
    for t in remaining_list:
#         if shortest_path_matrix[i,j] == shortest_path_matrix[i,t] + shortest_path_matrix[t,j]:
#             candidate_node_list.append(t)
        if shortest_path_matrix[i,t] + shortest_path_matrix[t,j] <= threshold:
            candidate_node_list.append(t)


    candidate_node_list.append(j)
 
    return candidate_node_list

def dijkstra_cal_previous_nodes(adj_matrix, start):
 
    n = len(adj_matrix)
    distances = [float('inf')] * n
    previous_nodes = [-1] * n
    distances[start] = 0
    priority_queue = [(0, start)]  

    while priority_queue:
        current_distance, current_node = heapq.heappop(priority_queue)

        if current_distance > distances[current_node]:
            continue
 
        for neighbor, weight in enumerate(adj_matrix[current_node]):
            if weight > 0:  
                distance = current_distance + weight
 
                if distance < distances[neighbor]:
                    distances[neighbor] = distance
                    previous_nodes[neighbor] = current_node
                    heapq.heappush(priority_queue, (distance, neighbor))
 
    return previous_nodes

def dijkstra_show_path_from_previous_nodes(previous_nodes, start, end): 
    path = []
    current = end
    while current != -1:
        path.append(current)
        current = previous_nodes[current]
    path.reverse()
    return path if path[0] == start else []

def cal_candidate_paths_of_K_shortest_path(i, j, shortest_path_matrix, X_distance_matrix, threshold):
    candidate_node_list = cal_candidate_node_list_slack(i, j, shortest_path_matrix, X_distance_matrix, threshold)
 
    small_matrix = output_small_matrix(X_distance_matrix, candidate_node_list)
    path = dijkstra_show_path(small_matrix, 0, len(candidate_node_list) - 1)

    candidate_paths_list = []
    candidate_paths_list.append(path)

    K = len(candidate_node_list)
    # temp_list = [q for q in range(K) if q not in path]
    
    temp_list = [q for q in range(1, K - 1)]
 
    previous_nodes = dijkstra_cal_previous_nodes(small_matrix, 0)
 
    for m in temp_list:
        temp_path1 = dijkstra_show_path_from_previous_nodes(previous_nodes, 0, m)
        
#         temp_path1 = dijkstra_show_path(small_matrix, 0, m)
        temp_path2 = dijkstra_show_path(small_matrix, m, K - 1)
        temp_path =  temp_path1[:-1] + temp_path2
        if not check_if_temp_path_already_in_not_using_set(candidate_paths_list, temp_path):
            candidate_paths_list.append(temp_path)

    candidate_paths_list = [translate_path(ppp, candidate_node_list) for ppp in candidate_paths_list]    

    return candidate_paths_list

In [10]:
def already_in_temp_path_len_list(a, temp_path_len_list):
    return np.any(np.isclose(a, temp_path_len_list))


def cal_K_shortest_paths_warm(i, j, shortest_path_matrix, X_distance_matrix, K):
    N = len(X_distance_matrix)
    remaining_list = [k for k in np.arange(N) if k != i and k != j]
    temp_path_len_list = []
    temp_path_len_list.append(shortest_path_matrix[i,j])
    for yyy in remaining_list:
        ttt = shortest_path_matrix[i,yyy] + shortest_path_matrix[yyy,j]
        if not already_in_temp_path_len_list(ttt, temp_path_len_list):
            temp_path_len_list.append(ttt)  
    temp_path_len_list = np.sort(temp_path_len_list)
 
    if K <= len(temp_path_len_list):
        threshold = temp_path_len_list[K - 1]
    else:
        threshold = temp_path_len_list[-1]

    candidate_paths_list = cal_candidate_paths_of_K_shortest_path(i, j, shortest_path_matrix, X_distance_matrix, threshold)

    candidate_path_len_list = []
 

    for path_i in candidate_paths_list:
        aaa = cal_path_length(X_distance_matrix, path_i)
        candidate_path_len_list.append(aaa)
        
    K_shortest_path = []

    for ii in np.argsort(candidate_path_len_list)[:K]:
        K_shortest_path.append(candidate_paths_list[ii])
        
    return K_shortest_path


In [11]:
# Yen's algorithm implemented by deepseek

def yen_algorithm_deepseek(adj_matrix, source, target, K):
    def dijkstra_shortest_path(adj_matrix, source, target, excluded_edges=set()):
        n = len(adj_matrix)
        dist = [float('inf')] * n
        dist[source] = 0
        prev = [None] * n
        heap = [(0, source)]

        while heap:
            current_dist, u = heapq.heappop(heap)

            if u == target:
                break

            for v in range(n):
                if adj_matrix[u][v] != 0 and (u, v) not in excluded_edges:
                    alt = current_dist + adj_matrix[u][v]
                    if alt < dist[v]:
                        dist[v] = alt
                        prev[v] = u
                        heapq.heappush(heap, (alt, v))

        path = []
        u = target
        while prev[u] is not None:
            path.insert(0, u)
            u = prev[u]
        if path:
            path.insert(0, source)
        return path, dist[target]

    def get_path_edges(path):
        edges = set()
        for i in range(len(path) - 1):
            edges.add((path[i], path[i + 1]))
        return edges

    A = []
    B = []

    # Find the shortest path
    path, length = dijkstra_shortest_path(adj_matrix, source, target)
    if not path:
        return A
    A.append((path, length))

    for k in range(1, K):
        for i in range(len(A[-1][0]) - 1):
            spur_node = A[-1][0][i]
            root_path = A[-1][0][:i + 1]

            excluded_edges = set()
            for path, _ in A:
                if len(path) > i and root_path == path[:i + 1]:
                    excluded_edges.add((path[i], path[i + 1]))

            spur_path, spur_length = dijkstra_shortest_path(adj_matrix, spur_node, target, excluded_edges)

            if spur_path:
                total_path = root_path[:-1] + spur_path
                total_length = sum(adj_matrix[total_path[j]][total_path[j + 1]] for j in range(len(total_path) - 1))
                B.append((total_path, total_length))

        if not B:
            break

        B.sort(key=lambda x: x[1])
        A.append(B.pop(0))

    return [path for path, _ in A]

In [12]:
import heapq
import numpy as np
from collections import defaultdict
from copy import deepcopy

def dijkstra(adj_matrix, source, target):
    n = len(adj_matrix)
    distances = {i: float('inf') for i in range(n)}
    predecessors = {i: None for i in range(n)}
    distances[source] = 0
    pq = [(0, source)]
    
    while pq:
        curr_distance, u = heapq.heappop(pq)
        if curr_distance > distances[u]:
            continue
        if u == target:
            break
        for v in range(n):
            if adj_matrix[u][v] > 0:  # There is an edge
                new_distance = curr_distance + adj_matrix[u][v]
                if new_distance < distances[v]:
                    distances[v] = new_distance
                    predecessors[v] = u
                    heapq.heappush(pq, (new_distance, v))
    
    path = []
    node = target
    while node is not None:
        path.insert(0, node)
        node = predecessors[node]
    
    return path if path[0] == source else None

# Yen's algorithm implemented by chatGPT
def yen_k_shortest_paths_chatGPT(adj_matrix, source, target, K):
    A = []  # List of shortest paths
    B = []  # Candidate paths
    
    # First shortest path using Dijkstra
    first_path = dijkstra(adj_matrix, source, target)
    if first_path is None:
        return A
    A.append(first_path)
    
    for k in range(1, K):
        for i in range(len(A[k - 1]) - 1):
            spur_node = A[k - 1][i]
            root_path = A[k - 1][:i + 1]
            
            modified_graph = deepcopy(adj_matrix)
            
            # Remove the root_path edges
            for path in A:
                if len(path) > i and path[:i + 1] == root_path:
                    modified_graph[path[i]][path[i + 1]] = 0
            
            # Remove root path nodes except spur node
            for node in root_path[:-1]:
                for v in range(len(adj_matrix)):
                    modified_graph[node][v] = 0
                    modified_graph[v][node] = 0
            
            spur_path = dijkstra(modified_graph, spur_node, target)
            if spur_path is not None:
                total_path = root_path[:-1] + spur_path
                if total_path not in B:
                    heapq.heappush(B, (sum(adj_matrix[total_path[j]][total_path[j + 1]] for j in range(len(total_path) - 1)), total_path))
        
        if B:
            A.append(heapq.heappop(B)[1])
        else:
            break
    
    return A

 



In [13]:
# N = 1000

# X_distance_matrix = create_distance_matrix(N)



In [14]:
# X_distance_matrix = [[ 0., 3, 2, np.inf, np.inf, np.inf],
#        [np.inf,  0., np.inf, 4, np.inf, np.inf],
#        [np.inf, 1,  0., 2, 3, np.inf],
#        [np.inf, np.inf, np.inf,  0., 2, 1],
#        [np.inf, np.inf, np.inf, np.inf,  0., 2],
#        [np.inf, np.inf, np.inf, np.inf, np.inf,  0.]]

# X_distance_matrix = np.array(X_distance_matrix).astype('float64')

In [15]:

X_distance_matrix = np.loadtxt("./data/X_100_distance_matrix.csv", delimiter=",")


In [16]:

shortest_path_matrix = floyd_warshall(X_distance_matrix)




i = 0
j = 50


K = 7
 
shortest_path = dijkstra_show_path(X_distance_matrix, i, j)

aaa = cal_path_length(X_distance_matrix, shortest_path)
print(f"Shortest path: {shortest_path}, shortest path length: {aaa}")

check_shortest_path(i,j, X_distance_matrix, shortest_path_matrix, shortest_path)


Shortest path: [0, 76, 13, 95, 17, 9, 50], shortest path length: 57.0


True

In [17]:

 

start = time.time()
K_shortest_path_warm = cal_K_shortest_paths_warm(i, j, shortest_path_matrix, X_distance_matrix, K)
end = time.time()


time_used = end - start
time_used = np.round(time_used, 3)
time_used

0.009

In [18]:
# def remove_duplicated_paths(K_shortest_path):
#     K_shortest_path_new = []
 
#     for temp_path in K_shortest_path:
 
#         if not check_if_temp_path_already_in_not_using_set(K_shortest_path_new, temp_path):
#             K_shortest_path_new.append(temp_path)
#     return K_shortest_path_new



In [19]:

for path_i in K_shortest_path_warm:
    aaa = cal_path_length(X_distance_matrix, path_i)
    print(f"path: {path_i}, path length: {aaa}")

path: [0, 76, 13, 95, 17, 9, 50], path length: 57.0
path: [0, 76, 12, 85, 15, 50], path length: 69.0
path: [0, 76, 88, 9, 50], path length: 75.0
path: [0, 76, 13, 56, 88, 9, 50], path length: 78.0
path: [0, 76, 12, 85, 3, 85, 15, 50], path length: 81.0
path: [0, 76, 12, 38, 15, 50], path length: 82.0
path: [0, 76, 28, 56, 88, 9, 50], path length: 83.0


In [20]:


start = time.time()
K_shortest_path_Yen = yen_algorithm_deepseek(X_distance_matrix, i, j, K)
end = time.time()


time_used = end - start
time_used = np.round(time_used, 3)
time_used



0.22

In [21]:

for path_i in K_shortest_path_Yen:
    aaa = cal_path_length(X_distance_matrix, path_i)
    print(f"path: {path_i}, path length: {aaa}")

path: [0, 76, 13, 95, 17, 9, 50], path length: 57.0
path: [0, 76, 12, 85, 15, 50], path length: 69.0
path: [0, 76, 88, 9, 50], path length: 75.0
path: [0, 76, 13, 56, 88, 9, 50], path length: 78.0
path: [0, 76, 12, 38, 15, 50], path length: 82.0
path: [0, 76, 28, 56, 88, 9, 50], path length: 83.0
path: [0, 76, 28, 56, 88, 9, 50], path length: 83.0


In [22]:

start = time.time()
K_shortest_path_Yen_gpt = yen_k_shortest_paths_chatGPT(X_distance_matrix, i, j, K)
end = time.time()


time_used = end - start
time_used = np.round(time_used, 3)
time_used

0.17

In [23]:

for path_i in K_shortest_path_Yen_gpt:
    aaa = cal_path_length(X_distance_matrix, path_i)
    print(f"path: {path_i}, path length: {aaa}")

path: [0, 76, 13, 95, 17, 9, 50], path length: 57.0
path: [0, 76, 12, 85, 15, 50], path length: 69.0
path: [0, 76, 88, 9, 50], path length: 75.0
path: [0, 76, 13, 56, 88, 9, 50], path length: 78.0
path: [0, 76, 12, 38, 15, 50], path length: 82.0
path: [0, 76, 13, 55, 88, 9, 50], path length: 83.0
path: [0, 76, 28, 56, 88, 9, 50], path length: 83.0


In [24]:

# The results are not consistent, 
# which means the correctness of the calculated K shortest paths is not guaranteed. 

