# **Particle Swarm Optimization**



In [1]:
import numpy as np
import random
from particle import Particle

In [2]:
def InitializeSwarm(swarm_size, dimensionality, lower_bound, upper_bound):
    '''It takes as input the swarm size (number of particles I want to create, the dimensionality of the swarm
       (number of parameter for each particle) and the lower and upper bound of the parameter (that are two lists))
       swarm_size: int object. Number of particles;
       dimensionality: int object. Number of particle's components;
       lower_bound: list object. Each list element is the lower bound of the parameter that has that index;
       upper_bound: list object. Each list element is the upper bound of the parameter that has that index;'''

    #np.random.seed(3)
    # Usually the positions of particles are initialized to uniformly cover the search space
    swarm_list = []
    for particle in range(swarm_size):
        position = np.random.uniform(lower_bound, upper_bound, dimensionality)

        velocity = np.zeros(dimensionality)
        # velocity = np.random.random(dimensionality)

        part = Particle(position, velocity)
        swarm_list.append(part)

    return swarm_list

In [3]:
#This is a function that we have used for testing if the class and the algorithm work properly.

# # Let's first define the function we want to use for the evaluation:

# def f(lista):
#     '''Definisco una funzione f che prende in input una lista di tre elementi (ovviamente va cambiata se cambio la swarm size)'''
#     return(lista[0]**2+lista[1]**2+lista[2]**2)

In [4]:
def global_optimum(swarm, problem = 'minimum'):
    '''takes as input a swarm of particles and return the position of global optimum 
       and the value of the fitness function in that global optimum'''

    if problem == 'minimum':
        global_opt = (min([particle.bestfit for particle in swarm]))
        best_global_position = swarm[np.argmin(np.array([particle.bestfit for particle in swarm]))].position
        
    elif problem == 'maximum':
        global_opt = (max([particle.bestfit for particle in swarm]))
        best_global_position = swarm[np.argmax(np.array([particle.bestfit for particle in swarm]))].position

    return best_global_position, global_opt  

In [5]:
def acceleration_coefficient(iteration, total_iterations):

    c1_min = 0.5
    c1_max = 2.5
    c2_min = 0.5
    c2_max = 2.5

    c1 = (c1_min-c1_max)*(iteration/total_iterations) + c1_max
    c2 = (c2_max-c2_min)*(iteration/total_iterations) + c2_min
    return c1,c2


In [6]:
def PSO_inizialization(swarm_size, dim, evaluation_funct, lower_bound, upper_bound, problem):
    '''swarm_size: int object. Number of particles;
       dim: int object. Number of particle's components;
       lower_bound: list object. Each list element is the lower bound of the parameter that has that index;
       upper_bound: list object. Each list element is the upper bound of the parameter that has that index;
       evaluation_funct: function that takes particle.position as input and returns the fitness value;
       problem: string object. 'minimum' or 'maximum' '''

    swarm = InitializeSwarm(swarm_size, dim, lower_bound, upper_bound)

    #First we need to evaluate each particle

    for particle in swarm:
        particle.FitnessCalculator(particle.position, evaluation_funct)

        # When we inizialize the swarm we need also to calculate the local optimum:
        particle.BestLocal(problem)

    return(swarm)

In [7]:
def stop_criteria(criteria, iteration, best_global_fitness, best_global_pos, maximum_tolerance = None, correct_sol_fit = None, max_iteration = None, swarm_diameter = None, swarm_position = None):
    '''Takes as input criteria that is the the type of terminarion criteria we decide to use and can be in the list: [fixed_iteration, maximum_tolerance, swarm_radius, [fixed_iteration, swarm_radius]].
    It returns True if criteria is satisfied (and so we need to stop the algorithm), False otherwise.
    Swarm_position must be a list of array in which each element is the position of a particle.'''

    stopping = False

    if criteria == 'fixed_iteration':
        if max_iteration != None:

            if iteration >= max_iteration: 
                stopping = True

        else: return('You must specify max_iteration')

    elif criteria == 'maximum_tolerance':
        # for using this criteria you must know in advance which is the best solution and which is its fitness.

        if maximum_tolerance != None and correct_sol_fit != None:

            if best_global_fitness <= abs(correct_sol_fit - maximum_tolerance):
                stopping = True

        else: return('You must specify maximum_error and/or correct_sol')

    elif criteria == 'swarm_radius':
        if swarm_diameter != None and swarm_position != None:

            max_radius = [np.linalg.norm(swarm_position[i]-best_global_pos) for i in range(len(swarm_position))].max()
            swarm_radius = max_radius/swarm_diameter

            if swarm_radius <= maximum_tolerance:
                stopping = True

        else: return('You must specify swarm_diameter and/or swarm_position')
    

    elif criteria == ['fixed_iteration', 'swarm_radius']:
        # first we check if the maximum number of iteration is reached and then we check the condition on the swarm radius.
        if swarm_diameter != None and swarm_position != None and max_iteration != None :

            if iteration >= max_iteration: # if I have reached the maximum number of iteration I stop
                stopping = True
            
            else: # In case I have not reached maximum number of iteration I check if the swarm radius is less than tolerance

                max_radius = max([np.linalg.norm(swarm_position[i]-best_global_pos) for i in range(len(swarm_position))])
                swarm_radius = max_radius/swarm_diameter

                if swarm_radius <= maximum_tolerance: #if the swarm radius is less than tolerance I stop
                    stopping = True

        else: return('You must specify maximum_error and/or correct_sol')

        return(stopping)
    
    else: return('YOU MUST SPECIFY A VALID VALUE FOR THE CRITERIA')

In [9]:
from scipy.spatial.distance import pdist, squareform

def PSO_alg(swarm, lower_bound, upper_bound, v_max, evaluation_funct, max_iteration, problem, termination_criteria = 'fixed_iteration', max_tol = 0.1, correct_sol_fit = None):

    '''swarm: list object. Composed by particle objects;
       lower_bound: list object. Each list element is the lower bound of the parameter that has that index;
       upper_bound: list object. Each list element is the upper bound of the parameter that has that index;
       v_max: list object. Each list element is the max velocity of the parameter that has that index;
       evaluation_funct: function that takes particle.position as input and returns the fitness value;
       max_iteration: int object. Number of iterations;
       problem: string object. "minimum" or "maximum". '''

    best_positions = [] # in this variable we will save the best position found at each iteration
    evaluation = [] # We will use this variable for saving the best value of the function evaluation at each iteration

    #CALCULATION OF PARATEMETER FOR TERMINATION CRITERIA
    #Let's set the maximum_tolerance which (for the case of the termination criteria with swarm_radius) is the maximum quantity that can be tha radius (we want a swarm radius almost zero)

    #We create a list with all the position of the particles in the swarm.
    swarm_position = [swarm[i].position for i in range(len(swarm))]

    #Let's calculate the swarm_diameter that will be used for the termination criteria: swarm_radius
    swarm_diameters = pdist(swarm_position)
    swarm_diameters = squareform(swarm_diameters)
    # swarm_diameters is a bidimensional array which contains the distance between each pair of points. So for obtaining the maximum we need to apply the max function two times (on the two dimensions).
    swarm_diameter = swarm_diameters.max()

    #___________________________________________________________________________________

    # First we calculate the starting global best position of the swarm
    best_global_position, global_opt = global_optimum(swarm, problem)

    evaluation.append(global_opt)
    best_positions.append(best_global_position)
    print('Starting swarm position: ', swarm_position, '\n with an evaluation of: ', evaluation)
    


    # Calculate the coefficient needed for updating the velocity of the particles.
    c1, c2 = acceleration_coefficient(swarm[0].iteration, max_iteration)

    # set the termination criteria
    criteria_not_reach = not(stop_criteria(termination_criteria, swarm[0].iteration, global_opt, best_global_position, max_iteration=max_iteration, swarm_position = swarm_position, swarm_diameter=swarm_diameter, maximum_tolerance=max_tol, correct_sol_fit=correct_sol_fit ))

    while criteria_not_reach:

        print('Iteration ', swarm[0].iteration)
        
        for particle in swarm:
            ## We need to calculate the new velocity:",
            particle.VelocityCalculator(c1, c2, best_global_position, w_schedule = 'nonlinearly decreasing', v_max = v_max)

            ## We can calculate the new position:",
            particle.PositionCalculator(lower_bound, upper_bound, evaluation_funct, problem)

            #print('New velocity: ', particle.velocity, '\n New position: ', particle.position, '\n New fitness: ', particle.fitness , '\n Best local position: ', particle.bestp, 'Best local fitness', particle.bestfit ,'\n')

        # Then we update the global optimum
        best_position, opt = global_optimum(swarm, problem)

        if problem == 'minimum':
            if opt < global_opt:
                global_opt = opt
                best_global_position = best_position
        elif problem == 'maximum':
            if opt > global_opt:
                global_opt = opt
                best_global_position = best_position
    
        evaluation.append(global_opt)
        best_positions.append(best_global_position)

        # we calculate the new list of swarm position and thwn we call the stop_criteria function for evakuate if the termination criteria has been reached or not. (Remember that stop_criteria function returns True if the criteria is not satisfied and False when the criteria is satisfied.)
        swarm_position = [swarm[i].position for i in range(len(swarm))]

        print('Swarm position: ', swarm_position)
        print('GLOBAL OPTIMUM: ', global_opt, 'GLOBAL OPT POSITION: ', best_global_position, 'evaluation: ', evaluation[swarm[0].iteration])

        criteria_not_reach = not(stop_criteria(termination_criteria, swarm[0].iteration, global_opt, best_global_position, max_iteration=max_iteration, swarm_position = swarm_position, swarm_diameter=swarm_diameter, maximum_tolerance=max_tol ))
    
    return best_positions, evaluation, best_global_position, global_opt

In [11]:
def PSO(swarm_size, dim, evaluation_funct, lower_bound, upper_bound, v_max, problem, max_iteration, termination_criteria, max_tol):
    '''swarm_size: int object. Number of particle objects;
    lower_bound: list object. Each list element is the lower bound of the parameter that has that index;
    upper_bound: list object. Each list element is the upper bound of the parameter that has that index;
    v_max: list object. Each list element is the max velocity of the parameter that has that index;
    evaluation_funct: function that takes particle.position as input and returns the fitness value;
    max_iteration: int object. Number of iterations;
    problem: string object. "minimum" or "maximum". '''
   
    swarm = PSO_inizialization(swarm_size, dim, evaluation_funct, lower_bound, upper_bound, problem)

    best_positions, evaluation, best_global_position, global_opt = PSO_alg(swarm, lower_bound, upper_bound, v_max, evaluation_funct, max_iteration, problem, termination_criteria, max_tol)
    
    # print('Best position found: ', best_global_position, 'with an evaluation of: ',global_opt)
    return(best_positions, evaluation, best_global_position, global_opt)