Model

In [1]:
__author__ = 'Marek'

In [2]:
class Depot(object):
    def __init__(self, depot_no, demand):
        self._depot_no = depot_no
        self._demand = demand

    @property
    def depot_no(self):
        return self._depot_no

    @property
    def demand(self):
        return self._demand

    def get_closest_depot_no(self, distances):
        all_distances = distances[self.depot_no]
        # ignoring distance to itself
        second_closest_distance = sorted(all_distances)[1]
        return all_distances.index(second_closest_distance)

    def __eq__(self, other):
        if isinstance(other, Depot):
            return self.depot_no == other.depot_no and self.demand == other.demand
        return False

    def __hash__(self):
        return hash((self.depot_no, self.demand))

    def __str__(self):
        return 'Depot: {depot_no: %s, demand: %s}' % (self.depot_no, self.demand)

    def __repr__(self):
        return str(self)

In [3]:
import random


class Route(object):
    def __init__(self, route_no=-1, depots=None):
        if not depots:
            depots = []
        self._route_no = route_no
        self._depots = depots

    @staticmethod
    def of(route):
        return Route(route.route_no, list(route.depots))

    @property
    def route_no(self):
        return self._route_no

    @route_no.setter
    def route_no(self, route_no):
        self._route_no = route_no

    @property
    def depots(self):
        return self._depots

    def sum_of_demands(self):
        result = 0
        for depot in self.depots:
            result += depot.demand
        return result

    def calculate_length(self, distance_array):
        result = 0
        first_segment_length = distance_array[0][self.depots[0].depot_no]
        last_segment_length = distance_array[0][self.depots[-1].depot_no]
        result += first_segment_length + last_segment_length
        for curr, nxt in zip(self.depots, self.depots[1:]):
            result += distance_array[curr.depot_no][nxt.depot_no]
        return result

    def insert_depot(self, depot):
        depots_cnt = len(self.depots)
        after_idx = random.randint(0, depots_cnt)
        self.insert_subroute([depot], after_idx)

    def contains_depot(self, depot_no):
        depot_nos = [depot.depot_no for depot in self.depots]
        return depot_no in depot_nos

    def index_of(self, depot_no):
        depot_nos = [depot.depot_no for depot in self.depots]
        return depot_nos.index(depot_no)

    def insert_subroute(self, subroute, after):
        self.depots[after:after] = subroute

    def __eq__(self, other):
        if isinstance(other, Route):
            return self.route_no == other.route_no and self.depots == other.depots
        return False

    def __str__(self):
        return 'Route: {route_no: %d, depots: %s}' % (self.route_no, self.depots)

    def __repr__(self):
        return self.__str__()

    def __unicode__(self):
        return unicode(self.__str__())

    def __len__(self):
        return len(self.depots)


class Individual(object):
    def __init__(self, routes=None):
        if not routes:
            routes = []
        self._routes = routes

    @staticmethod
    def of(individual):
        return type(individual)([Route.of(r) for r in individual.routes])

    @property
    def routes(self):
        return self._routes

    def add_depot(self, depot, max_demand_on_route):
        if not self.routes:
            self.routes.append(Route(len(self.routes), []))
        last_route = self.routes[-1]
        if last_route.sum_of_demands() + depot.demand <= max_demand_on_route:
            last_route.depots.append(depot)
        else:
            new_route = Route(len(self.routes), [depot])
            self.routes.append(new_route)

    def add_route(self, depots=None, route=None):
        if depots:
            new_route_no = len(self.routes)
            self.routes.append(Route(new_route_no, depots))
        if route:
            new_route_no = len(self.routes)
            route._route_no = new_route_no
            self.routes.append(route)

    def add_routes(self, routes):
        for route in routes:
            self.add_route(route=route)

    @staticmethod
    def _split_route(route, max_demand_on_route):
        depots = route.depots
        if route.sum_of_demands() <= max_demand_on_route:
            result = [route]
        else:
            result = [Route()]
            for i in range(len(route) - 1, -1, -1):
                current_new_route = result[-1]
                current_depot = depots[i]
                next_demand_sum = current_new_route.sum_of_demands() + current_depot.demand
                if next_demand_sum <= max_demand_on_route:
                    current_new_route.depots.append(current_depot)
                else:
                    result.append(Route(depots=[current_depot]))
        return result

    def check_integrity(self, config):
        if config.debug:
            depots_in_individual = [depot for route in self.routes for depot in route.depots]
            if len(set(depots_in_individual)) != config.depot_cnt - 1:
                raise Exception("Invalid depot count")
            demands = [route.sum_of_demands() for route in self.routes]
            for demand in demands:
                if demand > config.max_capacity:
                    raise Exception("Route exceeds max demand")

    def normalize(self, config):
        max_demand_on_route = config.max_capacity
        new_routes = []
        for route in self.routes:
            if route.sum_of_demands() > max_demand_on_route:
                divided_routes = self._split_route(route, max_demand_on_route)
                new_routes += divided_routes
        filtered_routes = [route for route in self.routes if max_demand_on_route >= route.sum_of_demands() > 0]
        self._routes = new_routes + filtered_routes
        for i in range(len(self.routes)):
            self.routes[i].route_no = i
        self.check_integrity(config)

    def route_with_depot(self, depot):
        for route in self.routes:
            if depot in route.depots:
                return route

    def insert_depot(self, depot):
        route_with_depot = self.route_with_depot(depot)
        route_with_depot.depots.remove(depot)
        if random.random() < (1. / (2. * len(self.routes))):
            self.add_route([depot])
        else:
            random.choice(self.routes).insert_depot(depot)

    def __str__(self):
        return 'Individual: {Routes: %s}' % self.routes

    def __repr__(self):
        return str(self)


In [4]:
from unittest import TestCase
from model.depot import Depot
from model.individual import Route, Individual

__author__ = 'Marek'


class TestRoute(TestCase):
    def setUp(self):
        self.distance_matrix = [
            [0, 1, 9, 4, 2, 7, 10, 3, 8, 5],
            [1, 0, 7, 9, 6, 6, 10, 8, 8, 5],
            [9, 7, 0, 9, 7, 5, 1, 1, 4, 5],
            [4, 9, 9, 0, 1, 3, 2, 6, 8, 1],
            [2, 6, 7, 1, 0, 6, 4, 6, 1, 1],
            [7, 6, 5, 3, 6, 0, 7, 5, 1, 8],
            [10, 10, 1, 2, 4, 7, 0, 10, 6, 2],
            [3, 8, 1, 6, 6, 5, 10, 0, 5, 4],
            [8, 8, 4, 8, 1, 1, 6, 5, 0, 6],
            [5, 5, 5, 1, 1, 8, 2, 4, 6, 0]
        ]
        self.depots = [
            Depot(1, 34),
            Depot(2, 45),
            Depot(3, 50),
            Depot(4, 34),
            Depot(5, 21),
            Depot(6, 67),
            Depot(7, 34),
            Depot(8, 56),
            Depot(9, 85)
        ]

    def test_calculate_length_one_depot(self):
        depots = self.depots[0:1]
        route = Route(0, depots)
        length = route.calculate_length(self.distance_matrix)
        self.assertTrue(length == 2)

    def test_calculate_length_many_depots(self):
        depots = self.depots[0:3]
        route = Route(0, depots)
        length = route.calculate_length(self.distance_matrix)
        self.assertEqual(length, 21)

    def test_normalize(self):
        individual = Individual([Route(0, self.depots)])
        individual.normalize(100)

        self.assertGreater(len(individual.routes), 1)
        for route in individual.routes:
            self.assertTrue(route.sum_of_demands() <= 100)

        depots_in_individual = [depot for route in individual.routes for depot in route.depots]
        self.assertItemsEqual(depots_in_individual, self.depots)


ModuleNotFoundError: No module named 'model'

Algorithm

In [None]:
__author__ = 'Marek'


In [None]:
from copy import deepcopy
import random
from model.individual import Route, Individual
from utils.config import Config


class Operators(object):
    def __init__(self, settings=Config()):
        self._settings = settings

    @property
    def settings(self):
        return self._settings

    def _generate_individual(self):
        depots = self.settings.depots
        random.shuffle(depots)
        result = Individual()
        for depot in depots:
            result.add_depot(depot, self.settings.max_capacity)
        return result

    def _generate_individual2(self):
        depots = list(self.settings.depots)
        random.shuffle(depots)
        result = Individual()
        i = 0
        while depots:
            k = random.randrange(1, len(depots) + 1)
            result.routes.append(Route(i, depots[0:k]))
            del depots[0:k]
            i += 1
        return result

    def init_population(self, size=100):
        population = []
        for i in range(size):
            population.append(self._generate_individual())
        return population

    def init_population2(self, size=100):
        population = []
        for i in range(size):
            population.append(self._generate_individual2())
        return population

    def evaluate_individual(self, individual):
        distance_matrix = self.settings.distance_matrix
        total_distance = 0
        for route in individual.routes:
            total_distance += route.calculate_length(distance_matrix)
        return total_distance

    @staticmethod
    def _get_random_subroute(individual):
        random_route = random.choice(individual.routes).depots
        random_route_len = len(random_route)
        if random_route_len <= 1:
            return random_route
        end_idx, start_idx = Operators._get_random_subroute_indices(random_route_len)
        return random_route[start_idx:end_idx]

    def _insert_subroute(self, subroute, individual):
        distances = self.settings.distance_matrix
        first_depot_of_subroute = subroute[0]
        closest_depot_no = first_depot_of_subroute.get_closest_depot_no(distances)
        for route_idx in range(len(individual.routes)):
            current_route = individual.routes[route_idx]
            if current_route.contains_depot(closest_depot_no):
                current_route.insert_subroute(subroute, after=closest_depot_no)

    @staticmethod
    def _get_random_subroute_indices(random_route_len):
        if random_route_len == 1:
            return 0, 1
        start_idx = random.randrange(0, random_route_len - 1)
        end_idx = random.randrange(start_idx + 1, random_route_len + 1)
        return end_idx, start_idx

    def crossover(self, first_individual, second_individual):
        descendant = Individual.of(first_individual)
        subroute = self._get_random_subroute(second_individual)
        self._insert_subroute(subroute, descendant)
        descendant.normalize(self.settings)
        return descendant

    def swap(self, individual):
        routes = individual.routes
        first_route_idx = random.randrange(0, len(routes))
        first_dept_idx = random.randrange(0, len(routes[first_route_idx].depots))

        second_route_idx = random.randrange(0, len(routes))
        second_dept_idx = random.randrange(0, len(routes[second_route_idx].depots))

        first = routes[first_route_idx].depots[first_dept_idx]
        routes[first_route_idx].depots[first_dept_idx] = routes[second_route_idx].depots[
            second_dept_idx]
        routes[second_route_idx].depots[second_dept_idx] = first
        individual.normalize(self.settings)

    def inversion(self, individual):
        random_route = random.choice(individual.routes).depots
        random_route_len = len(random_route)
        if random_route_len > 1:
            end_idx, start_idx = Operators._get_random_subroute_indices(random_route_len)
            random_route[start_idx:end_idx] = reversed(random_route[start_idx:end_idx])
        individual.normalize(self.settings)

    def insertion(self, individual):
        depot_to_insert = random.choice(self.settings.depots)
        individual.insert_depot(depot_to_insert)
        individual.normalize(self.settings)

    def displacement(self, individual):
        route_to_get_subroute_from = random.choice(individual.routes)
        route_to_insert_subroute_to = random.choice(individual.routes)
        start_idx, end_idx = self._get_random_subroute_indices(len(route_to_get_subroute_from))
        subroute = route_to_get_subroute_from.depots[start_idx:end_idx]
        after = random.randrange(0, len(route_to_insert_subroute_to))
        route_to_insert_subroute_to.insert_subroute(subroute,
                                                    after=after)
        del route_to_get_subroute_from.depots[start_idx:end_idx]
        individual.normalize(self.settings)



In [None]:
from unittest import TestCase
from genetic_algorithm_vehicle_routing.operators import Operators
from model.depot import Depot
from model.individual import Route, Individual

__author__ = 'Marek'


class TestOperators(TestCase):
    def setUp(self):
        self.operators = Operators()
        self.depots = self.operators.settings.depots

    def test_inversion(self):
        depots = self.depots[0:3]
        route = Route(0, list(depots))
        individual = Individual([route])
        Operators.inversion(individual)
        self.assertItemsEqual(depots, route.depots)

    def test_insertion(self):
        depots = self.depots
        route = Route(0, list(depots))
        individual = Individual([route])
        individual.normalize(self.operators.settings.max_demand)
        self.operators.insertion(individual)

In [None]:
#    This file is part of DEAP.
#
#    DEAP is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as
#    published by the Free Software Foundation, either version 3 of
#    the License, or (at your option) any later version.
#
#    DEAP is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public
#    License along with DEAP. If not, see <http://www.gnu.org/licenses/>.

import random

from deap import base
from deap import creator
from deap import tools

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
# Attribute generator
toolbox.register("attr_bool", random.randint, 0, 1)
# Structure initializers
toolbox.register("individual", tools.initRepeat, creator.Individual,
    toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

def evalOneMax(individual):
    return sum(individual),

# Operator registering
toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

def main():
    random.seed(64)

    pop = toolbox.population(n=300)
    CXPB, MUTPB, NGEN = 0.5, 0.2, 40

    print("Start of evolution")

    # Evaluate the entire population
    fitnesses = list(map(toolbox.evaluate, pop))
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit

    print("  Evaluated %i individuals" % len(pop))

    # Begin the evolution
    for g in range(NGEN):
        print("-- Generation %i --" % g)

        # Select the next generation individuals
        offspring = toolbox.select(pop, len(pop))
        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))

        # Apply crossover and mutation on the offspring
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values

        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        print("  Evaluated %i individuals" % len(invalid_ind))

        # The population is entirely replaced by the offspring
        pop[:] = offspring

        # Gather all the fitnesses in one list and print the stats
        fits = [ind.fitness.values[0] for ind in pop]

        length = len(pop)
        mean = sum(fits) / length
        sum2 = sum(x*x for x in fits)
        std = abs(sum2 / length - mean**2)**0.5

        print("  Min %s" % min(fits))
        print("  Max %s" % max(fits))
        print("  Avg %s" % mean)
        print("  Std %s" % std)

    print("-- End of (successful) evolution --")

    best_ind = tools.selBest(pop, 1)[0]
    print("Best individual is %s, %s" % (best_ind, best_ind.fitness.values))

if __name__ == "__main__":
    main()

Start of evolution
  Evaluated 300 individuals
-- Generation 0 --
  Evaluated 181 individuals
  Min 44.0
  Max 66.0
  Avg 54.833333333333336
  Std 4.349584909952722
-- Generation 1 --
  Evaluated 191 individuals
  Min 47.0
  Max 68.0
  Avg 58.45666666666666
  Std 3.455641120769904
-- Generation 2 --
  Evaluated 199 individuals
  Min 52.0
  Max 68.0
  Avg 60.95333333333333
  Std 2.9024970092816367
-- Generation 3 --
  Evaluated 167 individuals
  Min 47.0
  Max 71.0
  Avg 62.96
  Std 2.907186497858939
-- Generation 4 --
  Evaluated 175 individuals
  Min 57.0
  Max 73.0
  Avg 64.99
  Std 2.8489588741621903
-- Generation 5 --
  Evaluated 168 individuals
  Min 58.0
  Max 74.0
  Avg 66.93333333333334
  Std 2.8051539866624524
-- Generation 6 --
  Evaluated 187 individuals
  Min 59.0
  Max 76.0
  Avg 68.91666666666667
  Std 2.826609669236565
-- Generation 7 --
  Evaluated 171 individuals
  Min 62.0
  Max 76.0
  Avg 70.88666666666667
  Std 2.4455038108513407
-- Generation 8 --
  Evaluated 155 i

In [None]:
import cProfile
import random
from deap import base
from deap import creator
from deap import tools
from genetic_algorithm_vehicle_routing.operators import Operators
from model.individual import Individual
from utils.config import Config

random.seed(64)

config = Config(max_demand=27, max_capacity=97, depot_cnt=37, randomize=True, debug=False)
#config = Config()
operators = Operators(settings=config)
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", Individual, fitness=creator.FitnessMin)


def deap_population(deap_type, size):
    my_population = operators.init_population(size)
    return [deap_type(my_individual.routes) for my_individual in my_population]


def deap_evaluation(*args):
    return operators.evaluate_individual(*args),


def print_stats(pop):
    fits = [ind.fitness.values[0] for ind in pop]
    length = len(pop)
    mean = sum(fits) / length
    sum2 = sum(x * x for x in fits)
    std = abs(sum2 / length - mean ** 2) ** 0.5
    print("  Min %s" % min(fits))
    print("  Max %s" % max(fits))
    print("  Avg %s" % mean)
    print("  Std %s" % std)


def deap_crossover(deap_type, child1, child2):
    descendant = operators.crossover(child1, child2)
    return deap_type(descendant.routes)


toolbox = base.Toolbox()
toolbox.register("population", deap_population, creator.Individual)
toolbox.register("evaluate", deap_evaluation)
toolbox.register("mate", deap_crossover, creator.Individual)
toolbox.register("swap", operators.swap)
toolbox.register("inversion", operators.inversion)
toolbox.register("insertion", operators.insertion)
toolbox.register("displacement", operators.displacement)
toolbox.register("select", tools.selTournament, tournsize=3)


def main():
    gen_num, pop_size = 100, 300
    mate_prob, swap_prob, inversion_prob, insertion_prob, displacement_prob = 0.75, 0.05, 0.1, 0.05, 0.1

    pop = toolbox.population(size=pop_size)

    print("Start of evolution")

    # Evaluate the entire population
    fitnesses = list(map(toolbox.evaluate, pop))
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit

    print("  Evaluated %i individuals" % len(pop))
    print_stats(pop)

    # Begin the evolution
    for g in range(gen_num):
        print("-- Generation %i --" % g)

        # Select the next generation individuals
        offspring = toolbox.select(pop, 100)
        # Clone the selected individuals
        offspring = list(map(Individual.of, offspring))
        random.shuffle(offspring)

        # Apply crossover and mutation on the offspring
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < mate_prob:
                descendant = toolbox.mate(child1, child2)
                offspring.append(descendant)

        for mutant in offspring:
            if random.random() < swap_prob:
                toolbox.swap(mutant)
                del mutant.fitness.values
            if random.random() < insertion_prob:
                toolbox.insertion(mutant)
                del mutant.fitness.values
            if random.random() < inversion_prob:
                toolbox.inversion(mutant)
                del mutant.fitness.values
            if random.random() < displacement_prob:
                toolbox.displacement(mutant)
                del mutant.fitness.values

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        #print("  Evaluated %i individuals" % len(invalid_ind))

        # The population is entirely replaced by the offspring
        pop[:] = offspring

        # Gather all the fitnesses in one list and print the stats
        #print_stats(pop)

    #print("-- End of (successful) evolution --")

    best_ind = tools.selBest(pop, 1)[0]
    print("Best individual is %s, %s" % (best_ind, best_ind.fitness.values))


if __name__ == "__main__":
    main()




Start of evolution
  Evaluated 300 individuals
  Min 1588.0
  Max 2582.0
  Avg 2042.7566666666667
  Std 172.30263333900655
-- Generation 0 --
-- Generation 1 --
-- Generation 2 --
-- Generation 3 --
-- Generation 4 --
-- Generation 5 --
-- Generation 6 --
-- Generation 7 --
-- Generation 8 --
-- Generation 9 --
-- Generation 10 --
-- Generation 11 --
-- Generation 12 --
-- Generation 13 --
-- Generation 14 --
-- Generation 15 --
-- Generation 16 --
-- Generation 17 --
-- Generation 18 --
-- Generation 19 --
-- Generation 20 --
-- Generation 21 --
-- Generation 22 --
-- Generation 23 --
-- Generation 24 --
-- Generation 25 --
-- Generation 26 --
-- Generation 27 --
-- Generation 28 --
-- Generation 29 --
-- Generation 30 --
-- Generation 31 --
-- Generation 32 --
-- Generation 33 --
-- Generation 34 --
-- Generation 35 --
-- Generation 36 --
-- Generation 37 --
-- Generation 38 --
-- Generation 39 --
-- Generation 40 --
-- Generation 41 --
-- Generation 42 --
-- Generation 43 --
-- Gene