In [1]:
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import functools
import random
import time
import random
from pygame_visualization import VRPVisualizator

pygame 2.0.1 (SDL 2.0.14, Python 3.8.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
vizu = VRPVisualizator()

In [3]:
def manhattan(a, b):
    return sum(abs(val1-val2) for val1, val2 in zip(a,b))

In [4]:
def generate_coordinates(n_nodes: int, x_coords_range, y_coords_range, min_distance: float):
    generated_coords = [(0, 0)]
    vizu.add_node(0, 0)
    for i in range(n_nodes - 1):
        is_ok = False
        while not is_ok:
            coord = (random.uniform(*x_coords_range), random.uniform(*y_coords_range))
            is_ok = True
            for node in generated_coords:
                if manhattan(coord, node) < min_distance:
                    is_ok = False
        generated_coords.append(coord)
        vizu.add_node(coord[0], coord[1])
    return generated_coords

In [5]:
def generate_graph_matrix(length: int):
    coords = generate_coordinates(length, (-600, 600), (-400, 400), 10)
    matrix = np.empty((length, length))
    matrix[:] = np.nan
    for i in range(length):
        for j in range(length):
            if i != j:
                dist = manhattan(coords[i], coords[j])
                matrix[i][j] = dist
                matrix[j][i] = dist
    return matrix

In [6]:
vizu.clear_nodes()
graph = generate_graph_matrix(25)

In [7]:
vizu.set_scaling(1.1)

In [8]:
print(graph)

[[          nan  314.42428867  439.21675189  222.94027263  246.72217775
   219.84879184  800.79043951  648.16246733  732.83402819  910.63466914
   744.16068662  207.41325135  530.40209734  359.599353    503.064901
   452.05619687  480.65557027  306.85021205  378.61157559  659.97139686
   503.51551339  341.05584667  136.8119393   435.58974396  368.37590785]
 [ 314.42428867           nan  500.56175435  524.15108974  175.79782514
   521.05960895  486.36615084  346.95165021  570.42894345  609.42385203
   429.73639795  138.47229657  844.82638601  177.60073374  817.48918967
   753.26701398  781.86638738  454.49614142  679.8223927   961.18221398
   189.09122472  655.48013534  252.2999669   736.80056108  126.41431685]
 [ 439.21675189  500.56175435           nan  662.15702453  324.76392921
   659.06554373  986.9279052   314.57471046  918.97149387  471.41791725
   930.2981523   393.55071704  551.34071005  322.96102062  464.32295393
   891.27294877  919.87232217  132.36653984  817.82832749 1099.1

In [9]:
#Evaporation factor for global update of pheromones
RHO = 0.4
#Evaporation factor for local update of pheromones
KAPPA = RHO
#Q
OMICRON = 1
#Impact of pheromones
ALPHA = 1
#Impact of weights
BETA = 1.5
#Initial pheromones
TAU = 1
#Exploration/exploitation trade off
EPSILON = 0.4

In [18]:
class Ant():
    def __init__(self, init_pos: int, trucks_pool: int, vizualizator: VRPVisualizator) -> None:
        self.init_pos = init_pos
        self.vizualizator = vizualizator
        self.current_pos = init_pos
        self.current_path = [init_pos]
        self.paths_history = []
        self.full_nodes_history = [init_pos]
        self.tabou = set()
        self.can_continue = True
        self.trucks_pool = trucks_pool
        self.pick_truck()

    def add_current_to_paths_history(self) -> None:
        self.paths_history.append(self.current_path)
        self.current_path = [self.init_pos]
    
    def add_to_current_path(self, node) -> None:
        self.current_path.append(node)
        self.full_nodes_history.append(node)
        #Depot node is automatically added to tabou in pick_truck()
        if node != 0:
            self.tabou.add(node)
        self.vizualizator.set_path(self.full_nodes_history)

    def move(self, dest: list, pheromones: list) -> None:
        choice = random.uniform(0, 1)

        if choice <= EPSILON:
            dest_picked = self.exploitation(dest, pheromones)
        else:
            dest_picked = self.biased_exploration(dest, pheromones)

        #No destination available
        if not dest_picked:
            self.add_to_current_path(self.init_pos)
            self.add_current_to_paths_history()
            self.can_continue = False
            return

        self.update_pheromones_locally(pheromones, dest_picked)
        self.current_pos = dest_picked
        self.add_to_current_path(self.current_pos)

        if dest_picked == 0:
            self.pick_truck()
            self.add_current_to_paths_history()

    def pick_truck(self):
        self.trucks_pool -= 1
        #Ant can't return to the start if there is no truck left in the pool.
        if self.trucks_pool == 0:
            self.tabou.add(0)

    def exploitation(self, dest: list, pheromones: list) -> int:
        current_best_viability = None
        current_best_dest = None
        for i in range(len(dest)):
            if not np.isnan(dest[i]) and i not in self.tabou:
                viability = pheromones[i] * (OMICRON / dest[i])**BETA
                if not current_best_viability or viability > current_best_viability:
                    current_best_viability = viability
                    current_best_dest = i
        if not current_best_dest:
            print('PATHS:', self.paths_history)
        return current_best_dest

    def biased_exploration(self, dest: list, pheromones: list) -> int:
        probabilities = []
        denominator = 0
        
        #Calculate the denominator first
        for i in range(len(dest)):
            if not np.isnan(dest[i]) and i not in self.tabou:
                denominator += pheromones[i]**ALPHA * (OMICRON/dest[i])**BETA

        #Calculate probabilities of picking one path
        for node, length in enumerate(dest):
            if node in self.tabou:
                probabilities.append(0)
            elif not np.isnan(dest[node]):
                nominator = pheromones[node]**ALPHA * (OMICRON/dest[node])**BETA
                probabilities.append(nominator / denominator)
            else:
                probabilities.append(np.nan)
        
        #If there is no path available return false
        if np.nansum(probabilities) == 0:
            return None

        #Roulette wheel
        cumulative_sum = []
        for i in range(len(probabilities)):
            if np.isnan(probabilities[i]):
                cumulative_sum.append(np.nan)
            elif np.nansum(cumulative_sum) == 0:
                cumulative_sum.append(1.0)
            else:
                cumulative_sum.append(np.nansum(probabilities[i:len(probabilities)]))
        
        #Pick a destination
        rand = random.uniform(0, 1)
        dest_picked = None
        cumulative_sum.append(0)

        for i in range(0, len(cumulative_sum)-1):
            p = cumulative_sum[i]
            nextp = cumulative_sum[i+1] if not np.isnan(cumulative_sum[i+1]) else cumulative_sum[i+2] if not np.isnan(cumulative_sum[i+2]) else cumulative_sum[i+3]
        
            if not np.isnan(p) and rand <= p and rand >= nextp:
                dest_picked = i
        if not dest_picked:
            print('PATHS:', self.paths_history)
        return dest_picked

    def update_pheromones_locally(self, pheromones: list, dest: int) -> None:
        #Apply the ACS local updating rule
        pheromones[dest] = (1-KAPPA) * pheromones[dest] + KAPPA * TAU


In [19]:
class ACO():
    def __init__(self, graph, trucks_pool: int, start: int, vizualizator: VRPVisualizator) -> None:
        self.graph = graph
        self.start = start
        self.vizualizator = vizualizator
        self.current_best_tour = None
        self.current_best_length = float('inf')
        self.current_max_best_length = float('inf')
        self.pheromones = np.ones(graph.shape)
        self.trucks_pool = trucks_pool
        #self.vizualizator.set_pheromones(self.pheromones)

    def run(self, iter: int) -> list:
        for i in range(iter):
            print('Tour:', i)
            self.tour_construction(25)
            self.global_update_pheromones()
        self.vizualizator.set_path(self.flatten_tour(self.current_best_tour))
        print('Best:', self.current_best_tour)
        print('Length:', self.calc_tour_distance(self.current_best_tour))
        return self.current_best_tour

    def tour_construction(self, ant_amount: int) -> None:
        ants = [Ant(self.start, self.trucks_pool, self.vizualizator) for i in range(ant_amount)]
        while ants:
            for ant in ants:
                if ant.can_continue:
                    ant.move(self.graph[ant.current_pos], self.pheromones[ant.current_pos])
                else:
                    self.update_current_best(ant.paths_history)
                    ants.remove(ant)

    def update_current_best(self, paths):
        biggest_length = 0
        for path in paths:
            length = self.calc_path_distance(path)
            biggest_length = length if length > biggest_length else biggest_length

        tour_length = self.calc_tour_distance(paths)

        if biggest_length < self.current_max_best_length:
            print('NEW BEST:', paths)
            print('NEW MAX LENGTH:', biggest_length)
            print('NEW FULL LENGTH:', tour_length)
            self.current_best_tour = paths
            self.current_best_length = tour_length
            self.current_max_best_length = biggest_length

    def global_update_pheromones(self) -> None:
        #Evaporation
        self.pheromones *= 1 - RHO
        #New pheromones
        full_path = self.flatten_tour(self.current_best_tour)
        for i in range(len(full_path)-1):
            current_node = full_path[i]
            next_node = full_path[i+1]
            self.pheromones[current_node][next_node] += RHO * (OMICRON / self.current_best_length)
           #self.pheromones[next_node][current_node] += RHO * (OMICRON / self.current_best_length)
        #self.vizualizator.set_pheromones(self.pheromones)

    def calc_path_distance(self, full_path: list) -> float:
        distance = 0
        for i in range(len(full_path)-1):
            cur_path = full_path[i]
            next_path = full_path[i+1]
            if not np.isnan(self.graph[cur_path][next_path]):
                distance += self.graph[cur_path][next_path]
        return distance

    def calc_tour_distance(self, tour: list) -> float:
        return self.calc_path_distance(self.flatten_tour(tour))

    def flatten_tour(self, tour) -> list:
        return [n for path in tour for n in path]


In [20]:
aco = ACO(graph, 3, 0, vizu)
start = time.time()
path = aco.run(50)
print(path)
print(time.time()-start)

Tour: 0
PATHS: []
PATHS: []
NEW BEST: [[0, 22, 11, 0]]
NEW MAX LENGTH: 458.05286098337797
NEW FULL LENGTH: 458.05286098337797
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
Tour: 1
PATHS: []
PATHS: []
PATHS: []
NEW BEST: [[0, 22, 0]]
NEW MAX LENGTH: 273.62387860031515
NEW FULL LENGTH: 273.62387860031515
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
Tour: 2
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
PATHS: []
Tour: 3
PATHS: []
PATHS: []
PATHS:

KeyboardInterrupt: 

2071.60
2259.60