# Swarm Optimisation

In this chapter we will consider search algorithms that fit into the general framework of evolutionary algorithms, but aim to imitate optimisation processes based on the swarm behaviour of different types of animals, such as flocks of birds or ant colonies.

In [None]:
import random
import math
import sys
import matplotlib.pyplot as plt
import matplotlib

# For presenting as slides
#plt.rcParams['figure.figsize'] = [12, 8]
#plt.rcParams.update({'font.size': 22})
#plt.rcParams['lines.linewidth'] = 3

We will continue to consider the one max problem we are now very familiar with.

In [None]:
n = 300

def get_random_solution():
    return [random.choice([0,1]) for _ in range(n)]

In [None]:
def get_fitness(solution):
    return sum(solution)

We will compare some new algorithms against baseline algorithms we are now familiar with, and so we need to set up the usual configuration.

In [None]:
max_steps = 500

In [None]:
configuration = {
    "P_xover": 0.7,
    "population_size": 20,
    "tournament_size": 2
}

In [None]:
fitness_values = []

The first baseline and sanity check as always is random search.

In [None]:
def randomsearch():
    fitness_values.clear()

    best = None
    best_fitness = -sys.maxsize

    for step in range(max_steps):
        # To make plots comparable between population based algorithms
        candidates = [get_random_solution() for _ in range(configuration["population_size"])]
        candidate = max(candidates, key=lambda k: get_fitness(k))
        fitness = get_fitness(candidate)
        if fitness > best_fitness:
            best = candidate
            best_fitness = fitness
        fitness_values.append(best_fitness)

    return best

As second baseline, we will use the familiar standard genetic algorithm.

In [None]:
def mutate(solution):
    P_mutate = 1/len(solution)
    mutated = solution[:]
    for position in range(len(solution)):
        if random.random() < P_mutate:
            mutated[position] = 1 - mutated[position]
    return mutated

In [None]:
def tournament_selection(population):
    # Make sure the sample isn't larger than the population
    candidates = random.sample(population, min(len(population), configuration["tournament_size"]))
    winner = max(candidates, key=lambda x: get_fitness(x))    
                
    return winner

In [None]:
def crossover(parent1, parent2):
    pos = random.randint(0, len(parent1))
    offspring1 = parent1[:pos] + parent2[pos:]
    offspring2 = parent2[:pos] + parent1[pos:]
    return offspring1, offspring2

In [None]:
def ga():
    fitness_values.clear()
    population = [get_random_solution() for _ in range(configuration["population_size"])]
    best_solution = max(population, key=lambda k: get_fitness(k))
    best_fitness = get_fitness(best_solution)
    iteration = 0
    
    while iteration < max_steps:
        new_population = []
        iteration += 1

        while len(new_population) < len(population):
            parent1 = tournament_selection(population)
            parent2 = tournament_selection(population)

            if random.random() < configuration["P_xover"]:
                offspring1, offspring2 = crossover(parent1, parent2)
            else:
                offspring1, offspring2 = parent1, parent2

            offspring1 = mutate(offspring1)
            offspring2 = mutate(offspring2)

            new_population.append(offspring1)
            new_population.append(offspring2)

        population.clear()
        population.extend(new_population)

        best_solution = max(population + [best_solution], key=lambda k: get_fitness(k))
        best_fitness = get_fitness(best_solution)
        fitness_values.append(best_fitness)

    return best_solution

Let's recall how these algorithms compare.

In [None]:
ga()
ga_values = fitness_values[:]

randomsearch()
random_values = fitness_values[:]

plt.ylabel('Fitness', fontsize=15)
plt.plot(ga_values, label = "GA")
plt.plot(random_values, label = "Random")
plt.legend()

## Ant Colony Optimisation

Ants walking to and from a food source deposit pheromone on the ground. Other ants perceive the presence of pheromone and tend to follow paths where the pheromone concentration is higher.  Through this mechanism, ants are able to transport food to their nest in a remarkably effective way. The pheromones represent a form of stigmergic communication, i.e., an indirect, non-symbolic form of communication mediated by the environment. This stigmergic information is local, and can only be accessed by those ants that visit the locus in which it was released.

Ant colony optimisation algorithms imitate this process of stigmergic communication by allocating the edges of a graph with pheromones. An ant then represents a randomised traversal of the graph, influenced by the pheromone information. The general scheme of ant algorithms is as follows:

- Step 1: Initialise the pheromone information
- Step 2: For each ant, do the following:
   -  Find a solution (a path) based on the current pheromone trail
   -  Reinforcement: Add pheromone
   -  Evaporation: Reduce pheromone
- Step 3: Stop if terminating condition satisfied; return to Step 2 otherwise 

There are several different popular flavours of ant colony optimisation algorithms, which mainly differ on how pheromones are reposited by ants and how the pheromones evaporate.

In an _Ant System_ (AS) pheromone values are updated by all the ants. In an _Ant Colony System_ (ACS) ants update edges using a local pheromone updating rule, and only the best ant of an iteration is allowed to update the trails by applying a modified global pheromone updating rule. Finally, _Max-Min Ant Systems_ (MMAS) explicitly limit the minimum and maximum values of the pheromone, and only one ant per iteration updates the pheromones (best or iteration-best).

A classical problem to solve with ant colony optimisation is the travelling salesman problem. To solve combinatorial problems rather than graph problems, we require a _construction graph_ on which the ants can deposit pheromones, and which is then used to construct a solution based on these pheromones.

We will create a construction graph for bitvectors. There is a node for each bit, with two outgoing alternative edges which will determine whether the bit is set to 0 or 1 by the ant. These two alternative edges lead to intermediate nodes, which both are then connected to node representing the successive bit. The initial pheromones are set to 1/2, which represents a uniform distribution.

In [None]:
def construct_initial_graph():
    num_nodes = 3 * n
    construction_graph = {}
    pheromones = {}
    initial_probability = 1/2
    for i in range(0, num_nodes, 3):

        construction_graph[i]     = [i + 1, i + 2]
        construction_graph[i + 1] = [i + 3]
        construction_graph[i + 2] = [i + 3]

        pheromones[(i, i + 1)]     = initial_probability
        pheromones[(i, i + 2)]     = initial_probability
        pheromones[(i + 1, i + 3)] = initial_probability
        pheromones[(i + 2, i + 3)] = initial_probability

    return construction_graph, pheromones

This can be visualised using GraphViz as usual.

In [None]:
from graphviz import Digraph
def plot_graph(construction_graph, pheromones):
    num_cluster = 1

    dot = Digraph()
    for current_node in range(n-1):
        for successor in construction_graph[current_node]:
            dot.edge(str(current_node), str(successor), label=str(pheromones[(current_node, successor)]))
    return dot

In [None]:
n = 10

In [None]:
graph, pheromones  = construct_initial_graph()

In [None]:
plot_graph(graph, pheromones)

In [None]:
n = 300

To create a solution based on a construction graph, an ant traverses the graph starting from the initial node. For each node it chooses an edge proportional to the pheromones.

In [None]:
def get_ant(construction_graph, pheromones):
    ant = []
    current_node = 0
    while current_node in construction_graph:
        successors = construction_graph[current_node]
        successor_node = random.choices(successors, [pheromones[(current_node, successor)] for successor in successors])[0]
        ant.append((current_node, successor_node))
        current_node = successor_node

    return ant

In [None]:
ant = get_ant(graph, pheromones)
ant

To convert a graph traversal to a bitstring for onemax, we iterate over the edges contained in the path traversed by the ant, and check whether the edges represent 1 or 0 in our construction graph.

In [None]:
def get_bitstring(ant):
    bitstring = []

    for (from_node, to_node) in ant:
        if from_node % 3 == 0:
            if to_node == from_node + 1:
                bitstring.append(1)
            else:
                bitstring.append(0)

    return bitstring

In [None]:
get_bitstring(ant)

Ants that traverse an edge lead to an increase in pheromones, but pheromones also evaporate (decrease) at a certain, parameterised, rate $\rho$.  

Pheromones are evaporated with the rate $(1 − \rho)$, although we need to make sure that pheromones do not evaporate completely, so we ensure a minimum value of $1/n$.

To deposit pheromones for an ant in our bitstring construction graph, we increase the pheromones for traversed edges by $\rho$, but again ensuring that the pheromones do not exceed $1 - 1/n$.

In [None]:
configuration["rho"] = 0.2
def update_pheromones(pheromones, ant):
    updated_pheromones = {}

    # Evaporation
    for transition in pheromones.keys():
        updated_pheromones[transition] = max((1 - configuration["rho"]) * pheromones[transition], 1 / n)

    # Update based on best ant
    for transition in ant:
        updated_pheromones[transition] = min(updated_pheromones[transition] + configuration["rho"], 1 - 1 / n)

    return updated_pheromones

We consider a Max-Min Ant System (MMAS), which starts from a construction graph with all pheromones set to 1/2. For each iteration it creates an ant that traverses the construction graph, derives the bitstring represented by the path, and calculates the fitness of that bitstring. The pheromones are then updated by the overall best ant observed so far.

In [None]:
def mmas():
    fitness_values.clear()
    construction_graph, pheromones = construct_initial_graph()

    best_ant = get_ant(construction_graph, pheromones)
    best_fitness = get_fitness(get_bitstring(best_ant))

    pheromones = update_pheromones(pheromones, best_ant)

    iteration = 0
    while iteration < max_steps * configuration["population_size"]:
        iteration += 1
        if iteration % configuration["population_size"] == 0:
            fitness_values.append(best_fitness)

        current_ant = get_ant(construction_graph, pheromones)
        current_fitness = get_fitness(get_bitstring(current_ant))

        if current_fitness > best_fitness:
            best_ant = current_ant
            best_fitness = current_fitness

        pheromones = update_pheromones(pheromones, best_ant)

    fitness_values.append(best_fitness)

    return get_bitstring(best_ant)

Our `mmas` implementation uses a pheromone update strategy where pheromones are updated only by the globally best ant observed so far. An alternative is to generate λ ants, and then to update pheromones by best of the λ ants. To keep things simple, we will set λ to our population size parameter.

In [None]:
def mmas_iteration():
    fitness_values.clear()
    construction_graph, pheromones = construct_initial_graph()

    best_ant = get_ant(construction_graph, pheromones)
    best_fitness = get_fitness(get_bitstring(best_ant))

    pheromones = update_pheromones(pheromones, best_ant)

    iteration = 0
    while iteration < max_steps:
        iteration += 1
        fitness_values.append(best_fitness)

        iteration_ant = None
        iteration_fitness = -sys.maxsize

        for i in range(configuration["population_size"]):
            current_ant = get_ant(construction_graph, pheromones)
            current_fitness = get_fitness(get_bitstring(current_ant))

            if current_fitness > iteration_fitness:
                iteration_ant = current_ant
                iteration_fitness = current_fitness

            if current_fitness > best_fitness:
                best_ant = current_ant
                best_fitness = current_fitness

        pheromones = update_pheromones(pheromones, iteration_ant)

    fitness_values.append(best_fitness)

    return get_bitstring(best_ant)

Let's have a look how our Max-Min Ant Systems compare to our baseline algorithms.

In [None]:
randomsearch()
random_values = fitness_values[:]

ga()
ga_values = fitness_values[:]

mmas()
mmas_values = fitness_values[:]

mmas_iteration()
immas_values = fitness_values[:]

plt.ylabel('Fitness', fontsize=15)
plt.plot(random_values, label = "Random")
plt.plot(ga_values, label = "GA")
plt.plot(mmas_values, label = "MMAS*")
plt.plot(immas_values, label = "MMAS-Iteration")
plt.legend()

## Particle Swarm Optimisation

Particle swarm optimisation (PSO) is loosely based on a flock of birds or school of fish searching for food. Within a flock, the birds communicate by crying out. The intensity of the cry of a bird is proportional to how much food (insects) it can find at its current location. We also assume that birds know which other birds are nearby, and they can recognise who is crying out the loudest.

The flock has a good chance of converging to the location with the most food, if each bird follows the direction which is the combination of (1) keeping the current direction; (2) returning to the location where it found the most food; and (3) moving towards the neighbouring bird whose cry is the loudest.

In PSO, the population is called swarm, and each individual is called a particle. To represent the three types of information that influence the movement direction, for each particle we record the current position $x$, the best position $p_b$ found so far, and the velocity $v$. In addition, we record the best particle (the global leader $p_g$) of the whole swarm found so far. 

The algorithm itself is reminiscent of an evolutionary algorithm. The swarm is initialised randomly, and all particles are evaluated. Instead of the evolutionary operators we are familiar with from genetic algorithms, particles update their velocity and position before their fitness is evaluated. 

We will start by considering a numerical problem where we can visualise the position and velocity, before moving on the one max. Our example problem consists simply of picking a target point in two dimensional space, and then the fitness of a particles is simply the distance to that point.

In [None]:
target = [1000, 1000]
def get_distance(particle):
    return math.sqrt((particle[0] - target[0]) ** 2 + (particle[1] - target[1]) ** 2)

We need some helper functions to visualise the swarm.

In [None]:
import matplotlib.animation as animation
from IPython.display import HTML

ims = []  # global variable to store images of the animation
    
def initialise_plot():
    global ims
    global fig
    global ax
    
    ims = []

    %matplotlib agg
    fig, ax = plt.subplots()
    ims = []
    %matplotlib inline

A plot consists of a number of points.

In [None]:
def addPoint(scat, new_point, c='k'):
    old_off = scat.get_offsets()
    new_off = numpy.concatenate([old_off,numpy.array(new_point, ndmin=2)])
    old_c = scat.get_facecolors()
    new_c = numpy.concatenate([old_c, numpy.array(matplotlib.colors.to_rgba(c), ndmin=2)])

    scat.set_offsets(new_off)
    scat.set_facecolors(new_c)

    scat.axes.figure.canvas.draw_idle()

In [None]:
def plot(population):
    function1_values = [x[0] for (x, xb, v) in population]
    function2_values = [x[1] for (x, xb, v) in population]

    x = ax.scatter(function1_values, function2_values, color="blue")
    addPoint(x, target, c='red')
    ims.append((x,))

The velocity of a particle in the k-th iteration is updated as follows: 

Where k is the iteration number, 𝑎 is the inertia weight, 𝑏 and 𝑐 are the learning factors called personal influence and social influence, respectively, and w1 and w2 are random numbers taken from [0,1]. 

Each particle tries to learn from the current leader as well as the best position found by itself. 

Also depends on the random factors w1 and w2. Thus, we will obtain a different solution in a different run. 

Based on the new velocity, the new position is obtained as follows:  x(k+1) = x(k) + v(k+1)

In [None]:
configuration["c1"] = 2
configuration["c2"] = 2
configuration["vmax"] = 200

In [None]:
import numpy
def demo_pso():
    initialise_plot()
    particles = []
    for i in range(configuration["population_size"]):
        x  = [random.randint(-10000, 10000) for _ in range(2)]
        xb = x[:]
        v  = [random.randint(-200, 200) for _ in range(2)]
        particles.append((x, xb, v))

    best_solution = min(particles, key=lambda x: get_distance(x[0]))[0]
    best_fitness = get_distance(best_solution)

    iteration = 0
    while iteration < max_steps:
        #print(f"Iteration {iteration}: Fitness {best_fitness}: {best_solution}")
        iteration += 1

        r1 = [random.uniform(0, configuration["c1"]) for _ in range(n)]
        r2 = [random.uniform(0, configuration["c2"]) for _ in range(n)]

        for (x, xb, v) in particles:
            for i in range(2):
                v[i] = v[i] + r1[i] * numpy.subtract(xb[i], x[i]) + r2[i] * numpy.subtract(best_solution[i], x[i])
                v[i] = max(v[i], -configuration["vmax"])
                v[i] = min(v[i], configuration["vmax"])
                x[i] = numpy.add(x[i], v[i])

            fitness_current = get_distance(x)
            fitness_best    = get_distance(xb)

            if fitness_current < fitness_best:
                for i in range(2):
                    xb[i] = x[i]
                fitness_best = fitness_current

            if fitness_best < best_fitness:
                best_fitness = fitness_best
                best_solution = xb[:]
        plot(particles)
        
        if iteration % 100 == 0:
            # Change target every 100 iterations
            target[0] = random.randint(-10000, 10000)
            target[1] = random.randint(-10000, 10000)
            best_solution = min(particles, key=lambda x: get_fitness(x[0]))[0]
            best_fitness = get_distance(best_solution)
            #print(f"New target: {target}")
            

    return best_solution

In [None]:
demo_pso()

In [None]:
im_ani = animation.ArtistAnimation(fig, ims, interval=50, repeat_delay=3000, blit=True)
HTML(im_ani.to_jshtml())

We now consider how to use PSO to solve our onemax problem. To represent a bitvector problem, each particle $i$ is a triple:
- Current position $x_i ∈ {0, 1}^n$
- Best position so far $x_i* ∈ {0, 1}^n$
- Velocity $v_i ∈ ℝ^n$

The velocity $v_i$ is probabilistically translated into a new position for the particle (i.e., a new solution) using the sigmoid function.

In [None]:
def sigmoid(x):
    return 1/(1 + math.exp(-x))

xmin, xmax = -configuration["vmax"], configuration["vmax"]
points = 1000
xlist = list(map(lambda x: xmin + float(xmax - xmin)*x/points, range(points+1)))
ylist = list(map(lambda y: sigmoid(y), xlist))
plt.plot(xlist, ylist)

When updating a particle, we set $a = b = c = 1$, and $r1, r2 ∈ [0,2]$. 

- vi’ = vi  + r1 × (xi* - xi) + r2 × (xi** - xi)
- v limited to [-vmax, vmax] with vmax often set to 4 or ln(n - 1)

In [None]:
configuration["c1"] = 2
configuration["c2"] = 2
configuration["vmax"] = math.log(n - 1)

In [None]:
def pso():
    fitness_values.clear()
    particles = []
    for i in range(configuration["population_size"]):
        x  = [0 for _ in range(n)]
        xb = [0 for _ in range(n)]
        v  = [0 for _ in range(n)]
        particles.append((x, xb, v))

    best_solution = [0 for _ in range(n)]
    best_fitness = get_fitness(best_solution)

    iteration = 0
    while iteration < max_steps:
        iteration += 1

        r1 = [random.uniform(0, configuration["c1"]) for _ in range(n)]
        r2 = [random.uniform(0, configuration["c2"]) for _ in range(n)]

        for (x, xb, v) in particles:
            for i in range(n):
                if random.random() < sigmoid(v[i]):
                    x[i] = 1
                else:
                    x[i] = 0

            fitness_current = get_fitness(x)
            fitness_best    = get_fitness(xb)

            if fitness_current > fitness_best:
                for i in range(n):
                    xb[i] = x[i]
                fitness_best = fitness_current

            if fitness_best > best_fitness:
                best_fitness = fitness_best
                best_solution = xb[:]

        for (x, xb, v) in particles:
            for i in range(n):
                v[i] = v[i] + r1[i] * (xb[i] - x[i]) + r2[i] * (best_solution[i] - x[i])
                v[i] = max(v[i], -configuration["vmax"])
                v[i] = min(v[i], configuration["vmax"])

        fitness_values.append(best_fitness)

    return best_solution

In [None]:
pso()
pso_values = fitness_values[:]

plt.ylabel('Fitness', fontsize=15)
plt.plot(random_values, label = "Random")
plt.plot(ga_values, label = "GA")
plt.plot(mmas_values, label = "MMAS*")
plt.plot(immas_values, label = "MMAS-Iteration")
plt.plot(pso_values, label = "PSO")
plt.legend()

A special variant of PSO is 1-PSO, which works with just 1 particle. Own and global best are the same, so c2=0.

In [None]:
def onepso():
    fitness_values.clear()
    
    v = [0 for _ in range(n)]
    best_solution = [0 for _ in range(n)]
    best_fitness = get_fitness(best_solution)

    iteration = 0
    while iteration < max_steps * configuration["population_size"]:
        iteration += 1

        r = [random.uniform(0, configuration["c1"]) for _ in range(n)]

        x = []
        for i in range(n):
            p = 1/(1 + math.exp(-v[i]))
            if random.random() < p:
                x.append(1)
            else:
                x.append(0)

        fitness_current = get_fitness(x)

        if fitness_current > best_fitness:
            best_fitness = fitness_current
            best_solution = x[:]

        for i in range(n):
            v[i] = v[i] + r[i] * (best_solution[i] - x[i])
            v[i] = max(v[i], -configuration["vmax"])
            v[i] = min(v[i], configuration["vmax"])

        if iteration % configuration["population_size"] == 0:
            fitness_values.append(best_fitness)


    return best_solution

In [None]:
onepso()
onepso_values = fitness_values[:]

plt.ylabel('Fitness', fontsize=15)
plt.plot(random_values, label = "Random")
plt.plot(ga_values, label = "GA")
plt.plot(mmas_values, label = "MMAS*")
plt.plot(immas_values, label = "MMAS-Iteration")
plt.plot(pso_values, label = "PSO")
plt.plot(pso_values, label = "1-PSO")
plt.legend()