# Step by step PSO Tutorial

Below you find a step by step tutorial of a simplified, yet more performant variant of the PSO algorithm. 
Asside from numpy no external dependency is needed. Make sure to import it.

In [3]:
import numpy as np
import sys

Since we want to evaluate a function, we need to define it before starting. I choose the Rastrigin function, which is a little more complex than x^2 but not too complicated. It can be replaced by any number of functions applicable to arrays. 

In [2]:
def rastrigin(row: np.ndarray):
    """
        apply rastrigin-function to matrix-row

    :param row:
    :return:
    """
    f_arr = np.frompyfunc(lambda d: d ** 2 - 10 * np.cos(np.pi * d) + 10, 1, 1)
    return np.sum(f_arr(row))

Next we need to set some constants. n depends on the function, the variable dims sets the dimensionality. 
Values c1 & c2 determined empirically (not by me) and are needed for the PSO algorithm.

In [3]:
n = 5.2
dims = 3
func = rastrigin

c1 = c2 = 1.494

num_particles = 10

Now we initialize the positions of the particles and their velocities. Together they represent our swarm and it's movement.

In [6]:
v = np.random.rand(num_particles, dims) * n +1
x = 2 * n * np.random.rand(num_particles, dims) - n
assert v.shape == x.shape

We will need to save the current best solutions of the particles as well as the global best solution. They will influence the positions or rather the velocities of the particles in our swarm.

In [5]:
best_solution = np.full((1, num_particles), sys.maxsize)
best_point = np.zeros_like(x)
best_global_point = np.zeros(dims)
best_global_solution = sys.maxsize

Now on to the good stuff. Since we defined a very nice function above, which makes it possible to apply a function to a row, we can apply it to all rows/ particles at once:

In [8]:
fx = np.apply_along_axis(func, axis=1, arr=x)

Now we determine the best solutions of all particles. If the current solution is not better than the best solution of a given particle, it is - of course - not changed at all. The index is also used to get the position of the best solution.

In [9]:
idx = fx < best_solution
best_solution[idx] = fx[idx[0]]
best_point[idx[0]] = x[idx[0], :]

Similary, we want to update the global best solution and points.

In [10]:
# update global best solutions
idx = np.argmin(best_solution)
if best_solution[:, idx][0] < best_global_solution:
    best_global_solution = best_solution[:,idx][0]
    best_global_point = x[idx, :]

After calculating the best solutions, we can update the velocities of the particles respective to the positions of their best solutions:
$$v = v + rand * c_1 * (best\_point -x)$$
The same is done with respect to position of the global best solution:
$$v = v + rand * c_2 * (global\_best\_point -x)$$

This will drag the particles into the area of the current best solution.

In [11]:
r = np.random.ranf(2)
v += r[0] * c1 * (best_point - x)
v += c2 * r[1] * (best_global_point - x)

Updating the particles is done by adding the velocites to the positions. In this implementation we don't want the particles to escape the search space, i.e. $-n, n$, thus we use np.clip.

In [12]:
x += v
x = np.clip(x, a_min=-n, a_max=n)
print(f"{best_global_solution}")

27


In [None]:
r1 = np.random.ranf(dims)
# update highest velocity
if np.sum(np.abs(particle.v)) < np.sum(np.abs(max_vel[,])):
    max_vel = particle.v
    
# weird
if np.sum(r1) < np.sum(particle.v):
    v = r1 * v * max_vel ** rand_speed_factor
    # select random dimension
    ri = np.random.randint(dims)
    # constant factor to keep the chaos realistic
    x[ri] = r1[0] * ((2 * n) - n) * self.ws[i] * 0.45

Barely anthing happend? That's normal. The PSO algorithm is run iteratively, so lets add a for-loop an decreasing weights to decrease the influence of personal best solutions towards the end of the execution and wrap into a function.

In [1]:
def run_pso(num_particles, dims, iterations):


    n = 5.2
    func = rastrigin
    c1 = c2 = 1.494    
    
    v = np.random.rand(num_particles, dims) * n +1
    x = 2 * n * np.random.rand(num_particles, dims) - n
    assert v.shape == x.shape

    best_solution = np.full((1, num_particles), sys.maxsize)
    best_point = np.zeros_like(x)
    best_global_point = np.zeros(dims)
    best_global_solution = sys.maxsize
    ws = np.linspace(0.9, 0.4, iterations)

    for i in range(iterations):
        # apply function to all particles
        fx = np.apply_along_axis(func, axis=1, arr=x)

        # update best solutions of points
        idx = fx < best_solution
        best_solution[idx] = fx[idx[0]]
        best_point[idx[0]] = x[idx[0], :]

        # update global best solutions
        idx = np.argmin(best_solution)
        if best_solution[:,idx][0] < best_global_solution:
            best_global_solution = best_solution[:,idx][0]
            best_global_point = x[idx, :]

        # update velocity after formula given constants c1 & c2, some randomness,
        # and personal/ global best solutions.
        r = np.random.ranf(2)
        v = ws[i]*v + r[0] * c1 * (best_point - x)
        # you can start the global updating later, if you want.
        v += c2 * r[1] * (best_global_point - x)

        # update position
        x += v

        # keep all particles in range [-n, n]
        x = np.clip(x, a_min=-n, a_max=n)

        print(f"{best_global_solution}")

num_particles = 30
dims = 10
iterations = 100

#run_pso(num_particles, dims, iterations)

## Modifications
The Algorithm can be adjusted by adding even more chaos for both position and velocity. Let's first look at position, since it's easier to grasp. We first select a random dimension (you could also select multiple dimensions) and some particles, which we want to update. For this I chose 10% of the particles, which is arbitrary. After, the points are reset with a constant factor 0.45.

In [7]:
n = 5.
r_dim = np.random.randint(dims)
r_particles = np.random.choice(range(num_particles), num_particles//10)
x[r_particles, r_dim] = np.random.ranf() * ((2 * n) - n) * 0.45

Next, lets add some randomness to the velocities of the particles. We will need to keep track of the highest velocity and set a minimum velocity. I also added some code to add values once the particles move too slow. Notice that the particles a completely new velocity.

In [11]:
lowest_speed = 0.1
# don't fall asleep.
max_velocity = np.zeros(dims)
p_vel_abs = np.sum(np.abs(v), axis=1)
# update all particles, which have a speed which is below our threshold
v[p_vel_abs < lowest_speed, :] += np.random.ranf(dims)

# largest absolute velocity sum larger than max_vel
max_vel_sum = np.sum(np.abs(max_velocity))
# check if any particle is moving faster 
if np.any(max_vel_sum > p_vel_abs):
    # get index of particle with highest velocity
    _idx = np.argmax(np.sum((v[ max_vel_sum < p_vel_abs , :]), axis=1))
    # update max_velocity
    max_velocity = v[_idx, :]
    
# choose a random particle and update
r_particles = np.random.choice(range(num_particles), num_particles // 10)
# new velocities for selected particles
vals = np.random.ranf((len(r_particles), dims)) * max_velocity
v[r_particles, :] = vals

The code could be inserted directly into the main function. I chose to put into an inner function which let's me keep the code structure without moving everything into a class.