# PSO Implementation

In [1]:
# Import necessary libraries
from softpy.evolutionary.candidate import FloatVectorCandidate
from typing import Callable
from softpy.evolutionary.singlestate import MetaHeuristicsAlgorithm
import numpy as np
import random

Start by defining the class for the particle candidate itself.

In [None]:
class ParticleCandidate(FloatVectorCandidate):
    """
    A candidate for particle swarm optimization (PSO) that represents a particle
    in the swarm with position and velocity attributes.
    """
    
    def __init__(self, size: int, lower: np.ndarray, upper: np.ndarray, candidate: np.ndarray, velocity: np.ndarray, 
                 inertia: float, w_l: float, w_n: float, w_g: float):
        # Call parent constructor
        super().__init__(size, candidate, None, lower=lower[0], upper=upper[0], intermediate=False, mutate=None, recombine=None)
        
        # Must have weights sum to 1
        if w_l + w_n + w_g != 1.0:
            raise ValueError("w_l + w_n + w_g must equal 1.0")
        
        # Initialize particle-specific attributes
        self.size = size
        self.lower = lower.astype(float)
        self.upper = upper.astype(float)
        self.candidate = candidate.astype(float)
        self.velocity = velocity.astype(float)
        self.inertia = inertia
        self.w_l = w_l
        self.w_n = w_n
        self.w_g = w_g

    @staticmethod
    def generate(size: int, lower: np.ndarray, upper: np.ndarray, inertia: float, 
                 w_l: float, w_n: float, w_g: float):
        # Generate a random candidate within the bounds  
        candidate = np.random.uniform(lower, upper, size)
        
        # Generate velocity within the specified range
        velocity_range = np.abs(upper - lower)
        velocity = np.random.uniform(-velocity_range, velocity_range, size)
        
        # Create and return particle instance
        return ParticleCandidate(size, lower, upper, candidate, velocity, inertia, w_l, w_n, w_g)
    
    def copy(self):
        # Create a copy of this particle
        # This is necessary so each particle can evolve independently
        # Without it, all best pos would belong to a single particle
        return ParticleCandidate(
            self.size,
            self.lower.copy(),
            self.upper.copy(),
            self.candidate.copy(),
            self.velocity.copy(),
            self.inertia,
            self.w_l,
            self.w_n,
            self.w_g
        )
    
    def mutate(self):
        # Update candidate position by adding velocity
        self.candidate += self.velocity
    
    def recombine(self, local_best, neighborhood_best, best):
     # Update velocity according to PSO formula
        r_l = random.uniform(0, 1)
        r_n = random.uniform(0, 1)
        r_g = random.uniform(0, 1)
        
        self.velocity = (
            self.inertia * self.velocity
            + r_l * self.w_l * (local_best.candidate - self.candidate)
            + r_n * self.w_n * (neighborhood_best.candidate - self.candidate)
            + r_g * self.w_g * (best.candidate - self.candidate)
        )

Now let's define the other necessary class.

In [None]:
class ParticleSwarmOptimizer(MetaHeuristicsAlgorithm):
    ''' Represents the process updating particles in a swarm. '''
    
    def __init__(self, fitness_func, pop_size: int, n_neighbors: np.ndarray, 
                 lower: np.ndarray, upper: np.ndarray, inertia: float, 
                 w_l: float, w_n: float, w_g: float, **kwargs):

        super().__init__(candidate_type=None, fitness_func=fitness_func, **kwargs)
        
        self.pop_size = pop_size
        self.fitness_func = fitness_func
        self.n_neighbors = n_neighbors
        self.particles = []
        
        # Store PSO-specific parameters
        self.lower = lower
        self.upper = upper
        self.inertia = inertia
        self.w_l = w_l
        self.w_n = w_n
        self.w_g = w_g
        
        self.best = np.array([None] * pop_size)  # Array of ParticleCandidate instances for each particle's best position
        self.fitness_best = np.full(pop_size, -1000.0, dtype=float)  # Array of best fitness values for each particle
        self.global_best = None  # ParticleCandidate with best position found so far
        self.global_fitness_best = -1000.0  # Best fitness value found so far
    
    def fit(self, n_iters):
        # Create population
        population = []
        for _ in range(self.pop_size):
            particle = ParticleCandidate.generate(
                size=2,
                lower=self.lower,
                upper=self.upper,
                inertia=self.inertia,
                w_l=self.w_l,
                w_n=self.w_n,
                w_g=self.w_g
            )
            population.append(particle)
        
        self.particles = population
        
        for i in range(n_iters):
            # Evaluate fitness and update bests
            for j in range(self.pop_size):
                candidate = population[j]
                fitness = self.fitness_func(candidate)

                # Update local best using the copy (in order not to point to the same object)
                if fitness > self.fitness_best[j]:
                    self.fitness_best[j] = fitness
                    self.best[j] = candidate.copy()

                # Update global best
                if self.global_best is None or fitness > self.global_fitness_best:
                    self.global_best = candidate.copy()
                    self.global_fitness_best = fitness
            
            # Update velocities and positions for each particle
            for j in range(self.pop_size):
                if self.best[j] is None:
                    continue
                    
                # Find neighborhood best
                possible_neighbors = list(range(self.pop_size))
                possible_neighbors.remove(j)
                
                n_neighbors_count = min(self.n_neighbors[j], len(possible_neighbors))
                if n_neighbors_count > 0:
                    neighbor_indices = np.random.choice(possible_neighbors, size=n_neighbors_count, replace=False)
                    
                    best_neighbor_idx = None
                    best_neighbor_fitness = -1000.0
                    
                    for neighbor_idx in neighbor_indices:
                        if self.best[neighbor_idx] is None:
                            continue
                        neighbor_fitness = self.fitness_best[neighbor_idx]
                        if neighbor_fitness > best_neighbor_fitness:
                            best_neighbor_fitness = neighbor_fitness
                            best_neighbor_idx = neighbor_idx
                    
                    if best_neighbor_idx is not None and self.global_best is not None:
                        neighborhood_best = self.best[best_neighbor_idx]
                        population[j].recombine(self.best[j], neighborhood_best, self.global_best)
                        population[j].mutate()
        

# Try to apply this algorithm to a sample population

Test out a 'treasure hunt' simulation, where I have defined the treasure position, while the PSO has to find it starting from a population of 10 individuals, each with a number of pre-defined neighbors and a 10 by 10 search space.

In [4]:
def hunt(x_treasure=3, y_treasure=7, area=[[0,0], [10,10]], particles=10, iters=20):
    
    def fit(p): 
        # Standard distance function (negative because closer is better)
        return -np.sqrt((p.candidate[0]-x_treasure)**2 + (p.candidate[1]-y_treasure)**2)
    
    pso = ParticleSwarmOptimizer(
        fitness_func=fit, 
        pop_size=particles, 
        # Number of neighbors for each particle
        n_neighbors=np.random.randint(2, 5, size=particles),
        lower=np.array(area[0]), upper=np.array(area[1]),
        inertia=0.5, 
        w_l=0.3, 
        w_n=0.3, 
        w_g=0.4
    )
    
    # Run the PSO for 20 iterations
    pso.fit(iters)
    
    # Once we fit the model, we can check the best position found
    if pso.global_best:
        # Get the best position found
        # and calculate the distance to the treasure at [3, 7]
        pos = pso.global_best.candidate
        dist = np.sqrt((pos[0]-x_treasure)**2 + (pos[1]-y_treasure)**2)
        print(f"Found at [{pos[0]:.2f}, {pos[1]:.2f}], {dist:.2f} away.")

hunt() 

Found at [3.00, 7.00], 0.00 away.


If we try a larger search space, and don't increase the number of iterations, we will see that it would actually take more iterations to reach the treasure now that the area to explore is larger.

In [5]:
hunt(x_treasure=5, y_treasure=8, area=[[0,0], [15,15]], particles=15, iters=10)

Found at [4.95, 7.95], 0.07 away.


That's why, if we increase our iterations to 20 we can see that the PSO is able to reach the treasure. Due to the fact that the neighbourhood is random, we might observe very little deviations from the treasure depending on the runs, especially if we decrease the iterations a little bit.

In [6]:
hunt(x_treasure=5, y_treasure=8, area=[[0,0], [15,15]], particles=15, iters=20)

Found at [5.00, 8.00], 0.00 away.
