Copyright **`(c)`** 2025 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [107]:
from itertools import product, combinations
import numpy as np
import networkx as nx
import heapq
from icecream import ic
from tqdm import tqdm

In [108]:
# added map to use with euclidean heuristic in A*
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
    """Problem generator for Lab3, returns (problem_matrix, map)."""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), map * 1_000

In [109]:
problem = create_problem(10, density=0.15, noise_level=10, negative_values=True)[0]

In [110]:
masked = np.ma.masked_array(problem, mask=np.isinf(problem))
G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

### Problems initialization

In [111]:
# Problems
size = [10, 20, 50, 100]
density = [0.2, 0.5, 0.8, 1.0]
noise_level = [0.0, 1.0, 0.5, 0.8]

# generate all combinations of problems with non negative values for A*
problems_non_negative = []
comb = list(product(size, density, noise_level))
for size, density, noise in tqdm(comb, desc='Generating problems with non negative values',):
    prob = create_problem(
        size,
        density=density,
        noise_level=noise,
        negative_values=False,
        seed=42,
    )
    problems_non_negative.append(prob)

# generate all combinations of problems with negative values
problems_negative = []
for size, density, noise in tqdm(comb, desc='Generating problems with negative values',
):
    prob = create_problem(
        size,
        density=density,
        noise_level=noise,
        negative_values=True,
        seed=42,
    )
    problems_negative.append(prob)

Generating problems with non negative values: 100%|██████████| 64/64 [00:00<00:00, 282.42it/s]
Generating problems with negative values: 100%|██████████| 64/64 [00:00<00:00, 300.95it/s]


### A* with Euclidean distance heuristic - w/o negative values

In [112]:
def path_search_astar(problem: tuple[np.ndarray, np.ndarray], start: int, goal: int) -> tuple[list[int] | None, float]:
    prob_matrix = problem[0] # get value matrix
    coords = problem[1] # get coordinates for activation function
    n = prob_matrix.shape[0] # get number of cities
    if start < 0 or start >= n or goal < 0 or goal >= n:
        raise IndexError('start/goal out of range')
    # avoid simple case
    if start == goal:
        return [start], 0.0
    
    # check for only non-negative edge weights
    finite_mask = np.isfinite(prob_matrix)
    if np.any(prob_matrix[finite_mask] < 0):
        raise ValueError('A* requires non-negative edge weights')
    
    # heuristic: euclidean distance between cities
    h = lambda u: float(np.linalg.norm(coords[u] - coords[goal]))

    # frontier: heap of (f_cost, g_cost, city)
    # f_cost = g_cost + h_cost
    # g_score: cost from start to current city
    frontier = []
    heapq.heappush(frontier, (h(start), 0.0, start))
    # array to reconstruct path
    previous_city = [-1] * n
    g_cost = [np.inf] * n
    g_cost[start] = 0.0
    visited = [False] * n
    while frontier:
        f, g, current = heapq.heappop(frontier)
        # skip if we have already found a better path to current
        if g > g_cost[current]:
            continue
        if current == goal:
            # reconstruct path
            path = []
            u = current
            while u != -1:
                path.append(u)
                u = previous_city[u]
            path.reverse()
            return path, float(g_cost[goal])
        # mark current as visited
        visited[current] = True
        # expand neighbors cities
        for neighbor in range(n):
            w = prob_matrix[current, neighbor]
            # skip visited or non-edges
            if visited[neighbor] or not np.isfinite(w):
                continue
            candidate_g = g_cost[current] + float(w)
            if candidate_g < g_cost[neighbor]:
                g_cost[neighbor] = candidate_g
                previous_city[neighbor] = current
                # push new candidate to frontier
                heapq.heappush(frontier, (candidate_g + h(neighbor), candidate_g, neighbor))
    # no path found
    return None, np.inf

### Bellman Ford

In [113]:
def path_search_bellman_ford(problem: np.ndarray, start: int, goal: int) -> tuple[list[int] | None, float]:
    n = problem.shape[0]
    if start < 0 or start >= n or goal < 0 or goal >= n:
        raise IndexError('start/goal out of range')
    # avoid simple case
    if start == goal:
        return [start], 0.0
    
    # initialize distances and predecessors
    g_cost = [np.inf] * n
    g_cost[start] = 0.0
    previous_city = [-1] * n

    # relax edges up to n-1 times
    for _ in range(n - 1):
        for i in range(n):
            for j in range(n):
                w = problem[i, j]
                if np.isfinite(w):
                    if g_cost[i] + w < g_cost[j]:
                        g_cost[j] = g_cost[i] + w
                        previous_city[j] = i

    # check for negative-weight cycles
    for i in range(n):
        for j in range(n):
            w = problem[i, j]
            if np.isfinite(w):
                if g_cost[i] + w < g_cost[j]:
                    raise ValueError('Graph contains a negative-weight cycle')

    # reconstruct path
    if g_cost[goal] == np.inf:
        return None, np.inf
    else:
        path = []
        u = goal
        while u != -1:
            path.append(u)
            u = previous_city[u]
        path.reverse()
        return path, float(g_cost[goal])

In [None]:
# compute and store shortest paths for non-negative values problem using path_search_astar function
shortest_paths = {}
for idx, prob in enumerate(tqdm(problems_non_negative, desc='Solving problems', unit='prob')):
    n = prob[0].shape[0]
    shortest_paths[idx] = {}
    pairs = list(combinations(range(n), 2))
    for s, d in tqdm(pairs, desc=f'Problem {idx} pairs', unit='pair', leave=False):
        path, cost = path_search_astar(prob,s,d)
        shortest_paths[idx][(s, d)] = (path, cost)

Solving problems: 100%|██████████| 64/64 [00:41<00:00,  1.54prob/s] 


In [None]:
# compute and store shortest paths for non-negative values problem using bellman ford
shortest_paths_bf = {}
for idx, prob in enumerate(tqdm(problems_negative, desc='Solving problems with Bellman Ford', unit='prob')):
    n = prob[0].shape[0]
    shortest_paths_bf[idx] = {}
    pairs = list(combinations(range(n), 2))
    for s, d in tqdm(pairs, desc=f'Problem {idx} pairs', unit='pair', leave=False):
        try:
            path, cost = path_search_bellman_ford(prob[0],s,d)
        except ValueError:
            path, cost = None, np.inf
        shortest_paths_bf[idx][(s, d)] = (path, cost)

Solving problems with Bellman Ford:  75%|███████▌  | 48/64 [14:35<15:04, 56.51s/prob]

### Save results

In [None]:
#combine all results
shortest_paths_combined = {
    **shortest_paths,
    **shortest_paths_bf
}
# save results to csv(size, density, noise_level, start, goal, astar path, astar cost, bellman ford path, bellman ford cost)
def save_csv_results(filename: str):
    import csv
    with open(filename, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['size', 'density', 'noise_level', 'start', 'goal', 'astar_path', 'astar_cost', 'bellman_ford_path', 'bellman_ford_cost'])
        for idx in shortest_paths_combined.keys():
            size_val, density_val, noise_val = comb[idx]
            for (s, d), (astar_res, bf_res) in shortest_paths_combined[idx].items():
                astar_path, astar_cost = astar_res
                bf_path, bf_cost = bf_res
                writer.writerow([size_val, density_val, noise_val, s, d, astar_path, astar_cost, bf_path, bf_cost])

save_csv_results('results/shortest_paths_results.csv')