Copyright **`(c)`** 2025 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:

import logging
from itertools import combinations

import numpy as np
import matplotlib.pyplot as plt
import networkx as nx

from icecream import ic

Cost: $d + (d \cdot \alpha \cdot w)^\beta$ with $\alpha \ge 0$ and $\beta \ge 0$

## Genetico

In [46]:
from importlib.resources import path
import numpy as np
import networkx as nx
import random

class GA_Solver:

    def __init__(self, problem, pop_size=50, generations=100, offprint=20):
        self.prob = problem
        self.graph = problem.graph
        # Tutte le città tranne il deposito
        self.cities_to_visit = [n for n in self.graph.nodes if n != 0]
        # Precalcolo cammini minimi
        self.all_paths = dict(nx.all_pairs_dijkstra_path(self.graph, weight='dist'))
        self.pop_size = pop_size
        self.generations = generations
        self.offprint = offprint

    def _calc_path_cost_constat_weight(self, path : list[int], weight: float) -> float:
        c = 0
        current_w = weight
        for u, v in zip(path, path[1:]):
            d = self.graph[u][v]['dist']
            # Formula del costo: d + (alpha * d * w)^beta
            c += d + (self.prob.alpha * d * current_w) ** self.prob.beta
        return c

    def _path_cost(self, path: list[tuple[int, float]]) -> float:
        total_cost = 0
        current_weight = 0
        for i in range(len(path)-1):
            u, v = path[i][0], path[i+1][0]
            d = self.graph[u][v]['dist']
            total_cost += d + (self.prob.alpha * d * current_weight) ** self.prob.beta
            if v==0:
                current_weight = 0  # Svuota il peso al deposito
            else:
                current_weight += path[i+1][1]  # aggiungo oro raccolto al peso
        return total_cost  
    
    def greedy_initialization(self)-> tuple[list[tuple[int, float]], float]:
        """Costruisce un individuo iniziale usando una strategia greedy."""

        if self.prob.beta >= 1.0 :
            # Se beta è alto, il costo del peso è molto penalizzante, quindi conviene fare più viaggi con carichi leggeri
            # potrebbe convenire addirittura portare poco oro per volta (prendere solo una parte dell'oro disponibile in ogni città) e fare più giri
            #print(f"Greedy initialization con beta alto: favorisco più viaggi con carichi leggeri. {self.prob.beta}")
            ind = list(self.cities_to_visit)
            random.shuffle(ind)
            chromo, cost= self._evaluate_and_segment(ind)
            chromo, cost= self.multiple_cycle(chromo)

        else:
            # Se beta è basso, il costo del peso è meno penalizzante, quindi conviene fare meno viaggi con carichi più pesanti:
            ind = list(self.cities_to_visit)
            random.shuffle(ind)
            chromo, cost= self._evaluate_and_segment(ind)

        return chromo, cost
    
    def multiple_cycle (self, chromo: list[tuple[int, float]])-> tuple[list[tuple[int, float]], float]:
        tours = []
        current_tour_cities = []
        #print(chromo)  
        # 1. Dividiamo il percorso originale in tour che partono e tornano a zero (escludendo gli 0)
        for n, w in chromo:
            if n == 0:
                if current_tour_cities:
                    #print("Tour trovato:", current_tour_cities)
                    tours.append(current_tour_cities)
                    current_tour_cities = []
            else:
                current_tour_cities.append((n,w))
        
        trips=[]

        for t in tours:
          closed_tour= [(0,0)]+ t+ [(0,0)]
          cost= self._path_cost( closed_tour)
          #print(closed_tour)
          # simulo il costo di fare più giri in uno stesso tour
          #print(f"costo tour {cost}")
          set_t= [ (c,w) for c,w in t if c!=0 and w!=0]
          
          best_trip=closed_tour

          for i in range(2, int (min(set_t, key=lambda x: x[1])[1] ) ):
              #print(f"Divido il carico per {i}")
              trip=[(0,0)]
              for j in range(i):
                # raccogliere tutto l'oro capire come fare se non è divisibile 
                r= [ 0 for _ in t]
                if j== i-1:
                    r= [ w % i for c, w in t] # se non è divisibile, aggiungo il resto all'ultimo giro
                trip.extend( [ (c, w//i + r) for (c, w), r in zip(t, r) ]) 
                trip.extend([(0,0)])
              cost_trip= self._path_cost( trip)       
              if cost_trip < cost and cost_trip>0:
                  cost= cost_trip
                  best_trip= trip
              else:
                  break
          #print (f"Best trip per questo tour: {best_trip} con costo {cost}")
          trips.extend(best_trip[:-1])  # escludiamo l'ultimo 0 che sarà aggiunto dal tour successivo
        
        trips.append((0,0))  # aggiungiamo l'ultimo ritorno al deposito alla fine di tutti i tour
        #print(f"Trips dopo ottimizzazione multi-cycle: {trips}")
        total_cost= self._path_cost( trips) 
        #print(f"Costo totale dopo ottimizzazione multi-cycle: {total_cost}")
        return trips, total_cost



    def _evaluate_and_segment(self, chromosome: list[int]) -> tuple[list[tuple[int, float]], float]:
        """Decoder Greedy: decide se tornare al deposito per scaricare."""
        route = []
        total_cost = 0
        current_node = 0
        current_weight = 0

        route.append((0, 0)) 
        for next_target in chromosome:
            # Opzione A: Diretto
            path_direct = self.all_paths[current_node][next_target]
            cost_direct = self._calc_path_cost_constat_weight(path_direct, current_weight)
            
            # Opzione B: Passando dal deposito
            path_to_depot = self.all_paths[current_node][0]
            path_from_depot = self.all_paths[0][next_target]
            cost_unload = (self._calc_path_cost_constat_weight(path_to_depot, current_weight) + 
                           self._calc_path_cost_constat_weight(path_from_depot, 0))
            
            if current_weight > 0 and cost_unload < cost_direct:
                # Scarico al deposito
                for v in path_to_depot[1:]:
                    route.append((v, 0))
                current_weight = 0
                # Vado al target
                for v in path_from_depot[1:]:
                    gold = self.graph.nodes[v].get('gold', 0) if v == next_target else 0
                    route.append((v, gold))
                    current_weight += gold
                total_cost += cost_unload
            else:
                # Vado diretto
                for v in path_direct[1:]:
                    gold = self.graph.nodes[v].get('gold', 0) if v == next_target else 0
                    route.append((v, gold))
                    current_weight += gold
                total_cost += cost_direct
            current_node = next_target

        # Ritorno finale
        path_home = self.all_paths[current_node][0]
        total_cost += self._calc_path_cost_constat_weight(path_home, current_weight)
        for node in path_home[1:]:
            route.append((node, 0))
            
        return route, total_cost

    def _create_individual(self) -> tuple[list[tuple[int, float]], float]:
        ind = list(self.cities_to_visit)
        random.shuffle(ind)
        return self._evaluate_and_segment(ind)



    def _mutate(self, ind: list[tuple[int, float]] ) -> tuple[list[tuple[int, float]], float]:
        """
        Mutation: 2 possibilità 
        1. Scambio casuale di due città nel percorso (swap mutation).
        2. Ottimizzazione locale: Scompone il percorso in tour (segmenti tra i depositi) e li ri-decodifica.
        """
        #print(ind)
        
        ratio= random.random()

        if ratio < 0.2:
            # OPZIONE 1:
            # Swap mutation
            chromo = set([n for n, g in ind if n != 0])
            chromo= list(chromo)  # rimuoviamo eventuali duplicati
            # Applichiamo uno swap casuale per diversità
            if len(chromo) >= 2:
                i, j = random.sample(range(len(chromo)), 2)
                chromo[i], chromo[j] = chromo[j], chromo[i]
            
            chromo, cost= self._evaluate_and_segment(chromo)
            if self.prob.beta >= 1.0:
                chromo, cost= self.multiple_cycle(chromo)
            return chromo, cost
        
        else:
            # OPZIONE 2:
            # Ottimizzazione locale: ri-decodifica dei tour
            tours = []
            current_tour_cities = []
                
            # 1. Dividiamo il percorso originale in tour che partono e tornano a zero (escludendo gli 0)
            for n, w in ind:
                if n == 0:
                    if current_tour_cities:
                        #print("Tour trovato:", current_tour_cities)
                        tours.append(current_tour_cities)
                        current_tour_cities = []
                else:
                    current_tour_cities.append(n)
                        
            # Aggiungiamo l'ultimo tour se il percorso non finiva con 0
            if current_tour_cities:
                #print("Tour trovato:", current_tour_cities)
                tours.append(current_tour_cities)

            #print("Eseguo ottimizzazione locale sui tour")
            mutated_ind = []
            mutated_ind.append((0, 0))
            # Il percorso deve iniziare con il deposito
            golden_city= set()
            # 2. Per ogni tour trovato, proviamo a ri-eseguire il decoder greedy
            for t in tours:
                # t è una lista di soli nomi di città (interi)
                # escludiamo le città dove ho già raccolto l'oro
                t = [city for city in t if city not in golden_city]
                # trasformiamo t in un set
                t_set = set(t)
                #print ("Tour da ri-decodificare (escludendo città già raccolte):", t_set)
                if len(t_set) > 0:
                    # Re-decodifichiamo il segmento di città
                    new_segment_route, _ = self._evaluate_and_segment(t_set)
                    #print("Nuovo segmento dopo ri-decodifica:", new_segment_route)
                    mutated_ind.extend(new_segment_route[1:])  # escludiamo il primo 0 del segmento, già presente
                    
                    # Aggiorniamo l'insieme delle città dove abbiamo raccolto l'oro
                    golden_city.update(t_set)

            if self.prob.beta >= 1.0:
                mutated_ind, cost= self.multiple_cycle(mutated_ind)
            else:
                cost= self._path_cost(mutated_ind)
                
            return mutated_ind, cost
    
    def _optimize_tour(self, tour: list[tuple[int, float]]) -> list[tuple[int, float]]:
        # Ottimizzazione locale per un singolo tour (escluso il deposito)
        target_city = [(c, w) for c, w in tour if c != 0 and w > 0]
        
        # Creare un dizionario per mappare città -> oro originale
        city_to_gold = {c: self.graph.nodes[c]['gold'] for c, w in target_city}
        
        # Controllo se esiste un percorso più breve per raccogliere l'oro in queste città
        optimize_tour, _ = self._evaluate_and_segment([c for c, w in target_city])
        
        new_tour = []
        for c, w in optimize_tour:
            # Usa il dizionario per ottenere l'oro originale della città
            real_w = city_to_gold.get(c, 0)
            if real_w:
                new_tour.append((c, real_w))
            else:
                new_tour.append((c, 0))
        
        return new_tour

    
    def _crossover(self, parent1: list[tuple[int, float]], parent2: list[tuple[int, float]]) -> tuple[list[tuple[int, float]], float]:
        # Crossover: 
        # estraggo dal primo genitore i tour che completano il raccoglimento di oro per le prime k= tot city/ 2 città e le restanti estraggo i tour dal genitore 2
        
        k= len(self.cities_to_visit) // 2
        tours_p1= []
        collect_gold_in_p1 = {} 
        tour=[]
        # ciclo che estrae dal primo genitore i tour che completano il raccoglimento di oro per le prime k città e le restanti estraggo dal genitore 2
        
        for c, w in parent1[1:]: 
            if c==0:
                # città deposito tour concluso
                if tour:
                    tours_p1.extend([(0,0)]+tour)
                    tour=[]
                # esco dal loop quando quando ho raccolto l'oro in tutte le k città
                if len(collect_gold_in_p1) >= k and all(v == 0 for v in collect_gold_in_p1.values()):
                    break
            else:
                # se la cittè c non è ancora stata visitata e c'è ancora spazio (meno di k città) la inserisco in collect_gold_in_p1
                if c not in collect_gold_in_p1 and w>0 and len(collect_gold_in_p1)<k:
                    # inserisco tuple (c, oro residuo) in collect_gold_in_p1
                    collect_gold_in_p1[c] = self.graph.nodes[c]['gold']-w 
                    tour.append((c,w)) 
                else:
                    # sottrarre il valore w al valore già presente in collect_gold_in_p1 per la città c
                    if c in collect_gold_in_p1:
                        collect_gold_in_p1[c] = max(0, collect_gold_in_p1[c] - w)
                        tour.append((c,w))
                    else:
                        tour.append((c,0))
        #print ("Città raccolte da p1:", collect_gold_in_p1)
        #print("Tour estratti dal genitore 1:", tours_p1)
        # estraggo i tour dal genitore 2 che completano il raccoglimento di oro per le città non ancora raccolte da p1
        tours_p2= []
        # le città che deve raccogliere p2 sono quelle rimanenti 
        collect_gold_in_p2 = {c: self.graph.nodes[c]['gold'] 
                          for c in self.graph.nodes 
                          if c not in collect_gold_in_p1 and c != 0}
        tour=[]
        
        for c, w in parent2[1:]:
            if c==0:
                # città deposito tour concluso
                if tour:
                    # ottimizzare il tour t2 prima di aggiungerlo a tours p2
                    tour = self._optimize_tour(tour) 
                    tours_p2.extend(tour[:-1]) # escludiamo l'ultimo 0 che sarà aggiunto dal tour successivo
                    tour=[]
                # esco dal loop quando quando ho raccolto tutto l'oro
                if all(v == 0 for v in collect_gold_in_p2.values()):
                    break
            else:
                # se la cittè c non è ancora stata visitata e c'è ancora spazio (meno di k città) la inserisco in collect_gold_in_p2
                if c in collect_gold_in_p2 and w>0 :
                    #print(f"Raccolgo oro in città {c} da genitore 2, oro residuo: {collect_gold_in_p2[c] - w}")
                    collect_gold_in_p2[c] = max(0, collect_gold_in_p2[c] - w)
                    tour.append((c,w))
                else:
                    tour.append((c,0))
        
        offspring = tours_p1 + tours_p2 + [(0,0)]  # aggiungo il ritorno finale al deposito
        cost = self._path_cost(offspring)
        #print("Città raccolte da p2:", collect_gold_in_p2)
        #print("Tour estratti dal genitore 2:", tours_p2)
        #print("Offspring dopo crossover:", offspring)
        # print(f"Costo dell'offspring: {cost}")
        return offspring, cost
    
    def _create_baseline_individual(self) -> tuple[list[tuple[int, float]], float]:
        # Creo un individuo basato sulla soluzione baseline ogni città target si torna al deposito
        path= []
        for target_city in self.cities_to_visit:
            if self.graph.nodes[target_city]['gold'] == 0:
                continue  # se non c'è oro, salto la città
            andata= self.all_paths[0][target_city]  # percorso dal deposito alla città target
            path.extend([(c, 0) for c in andata[:-1]])  # aggiungo il percorso di andata (escludendo la città target)
            w= self.graph.nodes[target_city]['gold']
            path.append((target_city, w))  # aggiungo la città target con l'oro raccolto
            ritorno= self.all_paths[target_city][0]  # percorso di ritorno al deposito
            path.extend([(c, 0) for c in ritorno[1:-1]])  # aggiungo il percorso di ritorno (escludendo city target e deposito)  
            
        path.extend([(0, 0)])  # aggiungo il ritorno finale al deposito
        cost= self._path_cost(path)   
        return path, cost
    
    def _create_nearest_neighbor_individual(self):
        unvisited = set(self.cities_to_visit)
        current_node = 0
        sequence = []
        
        while unvisited:
            # Trova la città più vicina tra quelle rimaste
            next_city = min(unvisited, key=lambda city: self.all_paths[current_node][city])
            sequence.append(next_city)
            unvisited.remove(next_city)
            current_node = next_city
            
        # Passa questa sequenza al tuo decoder segmentato
        return self._evaluate_and_segment(sequence)
    
    def _run_ga_logic(self)-> list[tuple[int, float]]:
        population = [self.greedy_initialization() for _ in range(self.pop_size-1)]
        population.append(self._create_baseline_individual())  
        #print("Popolazione iniziale creata.")
        #print(  [cost for cromo, cost in population] )
        # ordina la popolazione in base al costo (decrescnente)
        population = sorted(population, key=lambda x: x[1])
        best_chromo = population[0][0]
        #print(best_chromo)

        best_cost = population[0][1]
        #print(f"Best cost iniziale: {best_cost}")
        stagnation_counter = 0

        for gen in range(self.generations):
            #print(f"Generazione {gen}, miglior costo: {best_cost}, costo medio: {np.mean([cost for cromo, cost in population])}")
            next_gen = []
            for _ in range(self.offprint//2):
                parents = []
                for _ in range(2):
                    candidates = random.sample(population, 2)
                    parents.append(min(candidates, key=lambda x: x[1])[0])
                # per ogni iterazione 2 figli

                # estraggo un valore tra 0 e 1
                ratio = random.random()
                if ratio < 0.8:
                    # mutation
                    # print("MUTAZIONE")
                    #print("Genitore 1:", parents[0])
                    #print("Genitore 2:", parents[1])
                    offspring1=self._mutate(parents[0])
                    offspring2=self._mutate(parents[1])
                else:
                    # crossover
                    #print("CROSSOVER")
                    offspring1 = self._crossover(parents[0], parents[1])
                    offspring2 = self._crossover(parents[1], parents[0])
                
                next_gen.extend([offspring1, offspring2])

            
            next_gen = [(ind, cost) for ind, cost in sorted(next_gen, key=lambda x: x[1])]  
            population = next_gen[:self.pop_size]  # mantieni solo i migliori
        
            if population[0][1] < best_cost:
                best_chromo, best_cost = population[0]
                stagnation_counter = 0
            else:
                stagnation_counter += 1
            # se il costo rimane lo stess per 5 generazioni, interrompiamo
            #if stagnation_counter >= 5:
             #  break

        return best_chromo

    def solution(self):
        best_chromo = self._run_ga_logic()
        return best_chromo, self._path_cost(best_chromo)

In [43]:
from Problem import Problem

logging.getLogger().setLevel(logging.WARNING)

problems= [
    Problem(100, density=0.2, alpha=1, beta=1),
    Problem(100, density=0.2, alpha=2, beta=1),
    Problem(100, density=0.2, alpha=5, beta=1),
    Problem(100, density=1, alpha=10, beta=1),
    Problem(100, density=1, alpha=50, beta=1),
    Problem(100, density=1, alpha=0.1, beta=1),
    Problem(100, density=1, alpha=0.5, beta=1),
    Problem(100, density=1, alpha=0.8, beta=1),

]

for p in problems:
    print(f"\n\n--- PROBLEM: {p.graph.number_of_nodes()} cities, alpha {p.alpha}, beta {p.beta} ---")
    print(f"Baseline Cost: {p.baseline()}")
    
    solver = GA_Solver(p, pop_size=20, generations=50, offprint=10)
    best_path, best_cost = solver.solution()
    print(f"\n\nGA Cost: {best_cost}")
    print (f"Relative cost: {best_cost/p.baseline()*100:.2f}")



--- PROBLEM: 100 cities, alpha 1, beta 1 ---
Baseline Cost: 25266.40561851071


GA Cost: 25266.405618510726
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 2, beta 1 ---
Baseline Cost: 50425.309618179184


GA Cost: 50425.30961817908
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 5, beta 1 ---
Baseline Cost: 125902.02161718454


GA Cost: 125902.0216171845
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 10, beta 1 ---
Baseline Cost: 181991.77979473467


GA Cost: 181991.77979473467
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 50, beta 1 ---
Baseline Cost: 909661.0864565478


GA Cost: 909661.0864565475
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 0.1, beta 1 ---
Baseline Cost: 1893.6263959359335


GA Cost: 1893.626395935933
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 0.5, beta 1 ---
Baseline Cost: 9170.319462554065


GA Cost: 9170.319462554056
Relative cost: 100.00


--- PROBLEM: 100 cities, alpha 0.8, beta 1 ---
Baseline Cost: 14627.8

In [47]:
p=Problem(100, density=0.2, alpha=0.5, beta=1)
solver= GA_Solver(p, pop_size=20, generations=50, offprint=10)
best_path, best_cost = solver._create_nearest_neighbor_individual()
print("Nearest Neighbor path:", best_path)
print("Nearest Neighbor cost:", best_cost)
best_path, best_cost = solver._create_baseline_individual()
print("Baseline path:", best_path)
print("Baseline cost:", best_cost)


Nearest Neighbor path: [(0, 0), (1, 777.6274334029364), (0, 0), (42, 797.0202712368366), (0, 0), (3, 501.24044501614), (0, 0), (63, 311.0243392501504), (0, 0), (40, 110.0348527689692), (0, 0), (38, 784.9786858029353), (0, 0), (40, 0), (41, 829.5991862678536), (40, 0), (0, 0), (38, 0), (54, 0), (2, 971.8545996349064), (54, 0), (38, 0), (0, 0), (53, 0), (72, 0), (55, 0), (7, 0), (16, 739.2450199116262), (7, 0), (55, 0), (72, 0), (53, 0), (0, 0), (1, 0), (52, 867.6231411205725), (1, 0), (0, 0), (36, 0), (12, 581.5354926116814), (36, 0), (0, 0), (1, 0), (22, 489.0954613061818), (1, 0), (0, 0), (38, 0), (78, 0), (20, 398.1806156311157), (78, 0), (38, 0), (0, 0), (1, 0), (26, 924.883620882715), (1, 0), (0, 0), (53, 0), (72, 0), (24, 955.667633789923), (7, 132.69039556873452), (55, 0), (72, 0), (53, 0), (0, 0), (1, 0), (27, 0), (6, 230.4263739688564), (5, 14.922351420493342), (3, 0), (60, 594.8392300926953), (9, 122.7106721239026), (40, 0), (0, 0), (53, 0), (72, 0), (55, 237.42570063458356), 

In [41]:
def check_path(problem, path):
    total_cost = 0
    current_weight = 0
    is_correct=True
    total_gold= sum(problem.graph.nodes[ n ]['gold'] for n in problem.graph.nodes if n !=0 )

    for i in range(len(path)-1):
        u, v = path[i][0], path[i+1][0]
        d= problem.graph[u][v]['dist']
        total_cost += d + (problem.alpha * d * current_weight) ** problem.beta
        # Aggiorna il peso se si raccoglie oro
        if v != 0:  # Non raccogliere oro al deposito
            current_weight += path[i+1][1]
            total_gold -= path[i+1][1]
                
        else:
            current_weight = 0  # Svuota il peso al deposito
    is_correct = abs(total_gold) < 1e-6  # Tolleranza per errori float
    
    return is_correct, total_cost

is_correct, calculated_cost = check_path(p, best_path)
print(f"Calcolated Cost from path: {calculated_cost}")
print(f"Path is correct: {is_correct}")

Calcolated Cost from path: 12686.953618676507
Path is correct: True
