This code was based on in the following references:

[1] "A Novel Search Algorithm based on Fish School Behavior" published in 2008 by Bastos Filho, Lima Neto, Lins, D. O. Nascimento and P. Lima

[2] "An Enhanced Fish School Search Algorithm" published in 2013 by Bastos Filho and  D. O. Nascimento

[3] "Defining a Standard for Particle Swarm Optimization" published in 2007 by Bratton and Kennedy


### Imports

In [1]:
import os
import sys
import csv
import random
import copy
import math
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from random import randint
from google.colab import drive, files
%matplotlib inline
plt.rcParams["figure.figsize"] = (13,8)

### Connection with Google Drive

In [2]:
drive.mount('/content/drive', force_remount=True) #  Connect to drive
FOLDERNAME = 'POLI/decimo/tcc/Algoritms/Algoritms Notebook/' #  Set folder that have the datasts
assert FOLDERNAME is not None, "[!] Enter the foldername."
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME)) #  Locale the foldeer

%cd /content/drive/My\ Drive/$FOLDERNAME

Mounted at /content/drive
/content/drive/My Drive/POLI/decimo/tcc/Algoritms/Algoritms Notebook


### Class of Fish and School Fish with each method to execute optimization

In [3]:
from ObjectiveFunction import *
from SearchSpaceInitializer import UniformSSInitializer, OneQuarterDimWiseSSInitializer
%cd ../

class Fish(object):
  """
  Classe que define um peixe que contém sua posição (em diversas dimensões),
  seu peso e custo de desempenho. Para auxilio também memoriza o quanto andou
  e quanto de custo mudou.
  """
  def __init__(self, dim):
    nan = float('nan')
    self.pos = [nan for _ in range(dim)]
    self.last_pos = [nan for _ in range(dim)]
    self.delta_pos = np.nan
    self.delta_cost = np.nan
    self.weight = np.nan
    self.cost = np.nan
    # self.has_improved = False

class FSS(object):
  """
  Classe que representa a escola de peixes (classe acima) que irá se locomover
  dentro de uma região e serão avaliados de acordo com uma função objetivo
  """
  def __init__(self, objective_function, space_initializer, n_iter, school_size,
               step_ind_init, step_ind_final, step_vol_init, step_vol_final,
               min_weight, weight_scale, analytic_in=False):
    self.objective_function = objective_function # função de avalição de custo
    self.space_initializer = space_initializer # posições iniciais dos peixes

    self.dim = objective_function.dim
    self.minf = objective_function.minf # limite minimo da função
    self.maxf = objective_function.maxf # limite máximo da função
    self.n_iter = n_iter
    self.barycenter = np.zeros(self.dim)

    self.school_size = school_size  # quantidade de peixes
    self.step_ind_init = step_ind_init
    self.step_ind_final = step_ind_final
    self.step_vol_init = step_vol_init
    self.step_vol_final = step_vol_final

    self.curr_step_ind = self.step_ind_init * (self.maxf - self.minf)
    self.curr_step_vol = self.step_vol_init * (self.maxf - self.minf)
    self.min_w = min_weight
    self.w_scale = weight_scale
    self.prev_weight_school = 0.0
    self.curr_weight_school = 0.0
    self.best_fish = None
    
    self.analytic_in = analytic_in
    self.i_net = []
    
    self.optimum_cost_tracking_iter = []
    self.optimum_cost_tracking_eval = []

  def __gen_weight(self):
    return self.w_scale / 2.0

  def __init_fss(self):
    self.optimum_cost_tracking_iter = []
    self.optimum_cost_tracking_eval = []

  def __init_fish(self, pos):
    fish = Fish(self.dim)
    fish.pos = pos
    fish.weight = self.__gen_weight()
    fish.cost = self.objective_function.evaluate(fish.pos)
    self.optimum_cost_tracking_eval.append(self.best_fish.cost)
    return fish

  def __init_school(self):
    self.best_fish = Fish(self.dim)
    self.best_fish.cost = np.inf
    self.curr_weight_school = 0.0
    self.prev_weight_school = 0.0
    self.school = []

    positions = self.space_initializer.sample(self.objective_function, self.school_size)

    for idx in range(self.school_size):
        fish = self.__init_fish(positions[idx])
        self.school.append(fish)
        self.curr_weight_school += fish.weight
    self.prev_weight_school = self.curr_weight_school
    self.update_best_fish()
    self.optimum_cost_tracking_iter.append(self.best_fish.cost)
  
  def update_best_fish(self):
    for fish in self.school:
      if self.best_fish.cost > fish.cost:
        self.best_fish = copy.copy(fish)

  def update_steps(self, curr_iter):
    self.curr_step_ind = self.step_ind_init - curr_iter * float(self.step_ind_init - self.step_ind_final) / self.n_iter
    self.curr_step_vol = self.step_vol_init - curr_iter * float(self.step_vol_init - self.step_vol_final) / self.n_iter

  def max_delta_cost(self):
    max_ = 0
    for fish in self.school:
      if max_ < fish.delta_cost:
        max_ = fish.delta_cost
    return max_
    
  def total_school_weight(self):
    self.prev_weight_school = self.curr_weight_school
    self.curr_weight_school = 0.0
    for fish in self.school:
      self.curr_weight_school += fish.weight
      
  def calculate_barycenter(self):
    barycenter = np.zeros((self.dim,), dtype=float)
    density = 0.0

    for fish in self.school:
      density += fish.weight
      for dim in range(self.dim):
        barycenter[dim] += (fish.pos[dim] * fish.weight)
    for dim in range(self.dim):
      barycenter[dim] = barycenter[dim] / density

    return barycenter

  def feeding(self):
    for fish in self.school:
      if self.max_delta_cost():
        fish.weight = fish.weight + (fish.delta_cost / self.max_delta_cost())
      if fish.weight > self.w_scale:
        fish.weight = self.w_scale
      elif fish.weight < self.min_w:
        fish.weight = self.min_w
  
  def individual_movement(self):
    for fish in self.school:
      new_pos = np.zeros((self.dim,), dtype=float)
      for dim in range(self.dim):
        new_pos[dim] = fish.pos[dim] + (self.curr_step_ind * np.random.uniform(-1, 1))
        if new_pos[dim] < self.minf:
            new_pos[dim] = self.minf
        elif new_pos[dim] > self.maxf:
            new_pos[dim] = self.maxf
      cost = self.objective_function.evaluate(new_pos)
      self.optimum_cost_tracking_eval.append(self.best_fish.cost)
      if cost < fish.cost:
        fish.delta_cost = abs(cost - fish.cost)
        fish.cost = cost
        delta_pos = np.zeros((self.dim,), dtype=float)
        for idx in range(self.dim):
          delta_pos[idx] = new_pos[idx] - fish.pos[idx]
        fish.last_pos = copy.copy(fish.pos)
        fish.delta_pos = delta_pos
        fish.pos = new_pos
      else:
        fish.delta_pos = np.zeros((self.dim,), dtype=float)
        fish.delta_cost = 0
          
  def collective_instinctive_movement(self):
    cost_eval_enhanced = np.zeros((self.dim,), dtype=float)
    density = 0.0
    for fish in self.school:
      density += fish.delta_cost
      for dim in range(self.dim):
        cost_eval_enhanced[dim] += (fish.delta_pos[dim] * fish.delta_cost)
    for dim in range(self.dim):
      if density != 0:
        cost_eval_enhanced[dim] = cost_eval_enhanced[dim] / density
    for fish in self.school:
      new_pos = np.zeros((self.dim,), dtype=float)
      for dim in range(self.dim):
        new_pos[dim] = fish.pos[dim] + cost_eval_enhanced[dim]
        if new_pos[dim] < self.minf:
          new_pos[dim] = self.minf
        elif new_pos[dim] > self.maxf:
          new_pos[dim] = self.maxf

        fish.pos = new_pos
  
  def collective_volitive_movement(self):
    self.total_school_weight()
    barycenter = self.calculate_barycenter()
    self.barycenter = barycenter
    for fish in self.school:
      new_pos = np.zeros((self.dim,), dtype=float)
      for dim in range(self.dim):
        if self.curr_weight_school > self.prev_weight_school:
          new_pos[dim] = fish.pos[dim] - ((fish.pos[dim] - barycenter[dim]) * self.curr_step_vol * np.random.uniform(0, 1))
        else:
          new_pos[dim] = fish.pos[dim] + ((fish.pos[dim] - barycenter[dim]) * self.curr_step_vol * np.random.uniform(0, 1))
        if new_pos[dim] < self.minf:
          new_pos[dim] = self.minf
        elif new_pos[dim] > self.maxf:
          new_pos[dim] = self.maxf

      cost = self.objective_function.evaluate(new_pos)
      self.optimum_cost_tracking_eval.append(self.best_fish.cost)
      fish.cost = cost
      fish.pos = new_pos
      
  def optimize(self):
    self.__init_fss()
    self.__init_school()

    for i in range(self.n_iter):
      self.individual_movement()
      self.update_best_fish()
      self.feeding()
      self.collective_instinctive_movement()
      self.collective_volitive_movement()
      self.update_steps(i)
      self.update_best_fish()
      self.optimum_cost_tracking_iter.append(self.best_fish.cost)
      #print "Iteration: ", i, " Cost: ", self.best_fish.cost

/content/drive/MyDrive/POLI/decimo/tcc/Algoritms


### Validate FSS

Validate learning in each iteration

In [4]:
def create_dir(path):
  directory = os.path.dirname(path)
  try:
    os.stat(directory)
  except:
    os.mkdir(directory)

def main():
  for d in [15, 30]:
    print (f"starting FSS ({d})")
    search_space_initializer = UniformSSInitializer()
    result_path = os.path.dirname(os.path.abspath('Algoritms')) + os.sep + "Results" + os.sep + f"{d}d" + os.sep
    num_exec = 1
    school_size = 30
    num_iterations = 1000
    step_individual_init = 0.1
    step_individual_final = 0.0001
    step_volitive_init = 0.01
    step_volitive_final = 0.001
    min_w = 1
    w_scale = num_iterations / 2.0

    unimodal_funcs = [SphereFunction, RotatedHyperEllipsoidFunction, RosenbrockFunction, DixonPriceFunction, PermFunction, QuarticNoiseFunction]
    multimodal_funcs = [GeneralizedShwefelFunction, RastriginFunction, AckleyFunction, GriewankFunction, LeviFunction, Levi13Function]
    regular_functions = unimodal_funcs + multimodal_funcs

    cec_functions = []

    create_dir(result_path)
    f_handle_csv = open(result_path + "/FSS_iter.csv", 'w+')
    writer_csv = csv.writer(f_handle_csv, delimiter=",")
    header = ['opt', 'func', 'exec_time'] + [f"run{str(i+1)}" for i in range(num_iterations)]
    writer_csv.writerow(header)

    for benchmark_func in regular_functions:
      func = benchmark_func(d)
      start = time.time()
      bests_iter, bests_eval = run_experiments(num_iterations, school_size, num_exec, func,
                            search_space_initializer, step_individual_init,
                            step_individual_final, step_volitive_init,
                            step_volitive_final, min_w, w_scale)
      end = time.time()
      row_csv = ['FSS', func.function_name, (end - start)] + [b for b in bests_iter[:num_iterations]]
      writer_csv.writerow(row_csv)
      print(func.function_name, end - start, bests_iter[-1])
      
    f_handle_csv.close()

def run_experiments(n_iter, school_size, num_runs, objective_function,
                    search_space_initializer, step_individual_init,
                    step_individual_final, step_volitive_init,
                    step_volitive_final, min_w, w_scale):
  opt1 = FSS(objective_function=objective_function, space_initializer=search_space_initializer,
              n_iter=n_iter, school_size=school_size, step_ind_init=step_individual_init,
              step_ind_final=step_individual_final, step_vol_init=step_volitive_init,
              step_vol_final=step_volitive_final, min_weight=min_w, weight_scale=w_scale)
  opt1.optimize()
  return opt1.optimum_cost_tracking_iter, opt1.optimum_cost_tracking_eval

main()

starting FSS (15)
Sphere 8.692980527877808 6.132896459575538e-07
RotatedHyperEllipsoid 11.406321287155151 9.295480230756746e-09
Rosenbrock 6.323806047439575 13.580130793993385
Dixon-Price 6.88180947303772 0.24079367484176822
Perm 26.587342739105225 9.032431622630824
Quartic-Noise 8.929491758346558 1.9760237670730987e-05
Generalized-Shwefel 6.578223943710327 4831.606106082125
Rastrigin 6.111978769302368 3.8232779502322956e-06
Ackley 6.613341808319092 0.0005729953129400833
Griewank 9.322747945785522 0.00020864346860383076
Levi 9.44184947013855 0.8822967414628755
Levi-13 9.629534721374512 1.3879416882801274
starting FSS (30)
Sphere 10.53278636932373 2.717754302506804e-11
RotatedHyperEllipsoid 21.891435384750366 1.5716875345465572e-09
Rosenbrock 10.819010257720947 28.61437437816915
Dixon-Price 12.613672494888306 0.2764250582471661
Perm 101.78509569168091 22.14200768481554
Quartic-Noise 14.644481420516968 0.00021704286343997376
Generalized-Shwefel 10.857241153717041 10492.414633368384
Rastr

Validate bests results in multiple sessions of iterations

In [5]:
def create_dir(path):
  directory = os.path.dirname(path)
  try:
    os.stat(directory)
  except:
    os.mkdir(directory)

def main():
  for d in [15, 30]:
    print (f"starting FSS ({d})")
    search_space_initializer = UniformSSInitializer()
    file_path = os.path.dirname(os.path.abspath('Algoritms')) + os.sep + "Executions" + os.sep + f"{d}d" + os.sep
    result_path = os.path.dirname(os.path.abspath('Algoritms')) + os.sep + "Results" + os.sep + f"{d}d" + os.sep
    num_exec = 30
    school_size = 30
    num_iterations = 1000
    step_individual_init = 0.1
    step_individual_final = 0.0001
    step_volitive_init = 0.01
    step_volitive_final = 0.001
    min_w = 1
    w_scale = num_iterations / 2.0

    unimodal_funcs = [SphereFunction, RotatedHyperEllipsoidFunction, RosenbrockFunction, DixonPriceFunction, PermFunction, QuarticNoiseFunction]
    multimodal_funcs = [GeneralizedShwefelFunction, RastriginFunction, AckleyFunction, GriewankFunction, LeviFunction, Levi13Function]
    regular_functions = unimodal_funcs + multimodal_funcs

    # regular_functions = [RosenbrockFunction, RastriginFunction]

    # Notice that for CEC Functions only the following dimensions are available:
    # 2, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100
    cec_functions = []

    create_dir(result_path)
    f_handle_csv = open(result_path + "/FSS_exec.csv", 'w+')
    writer_csv = csv.writer(f_handle_csv, delimiter=",")
    header = ['opt', 'func', 'exec_time'] + [f"run{str(i+1)}" for i in range(num_exec)]
    writer_csv.writerow(header)

    for benchmark_func in regular_functions:
      func = benchmark_func(d)
      
      start = time.time()
      runs = run_experiments(num_iterations, school_size, num_exec, func,
                            search_space_initializer, step_individual_init,
                            step_individual_final, step_volitive_init,
                            step_volitive_final, min_w, w_scale, file_path)
      end = time.time()
      row_csv = ['FSS', func.function_name, (end - start)] + [r for r in runs]
      writer_csv.writerow(row_csv)
      
    f_handle_csv.close()


def run_experiments(n_iter, school_size, num_runs, objective_function,
                    search_space_initializer, step_individual_init,
                    step_individual_final, step_volitive_init,
                    step_volitive_final, min_w, w_scale, save_dir):
  alg_name = "FSS"
  console_out = "Algorithm: {} Function: {} Execution: {} Best Cost: {}"
  if save_dir:
    create_dir(save_dir)
    f_handle_cost_iter = open(save_dir + "/FSS_" + objective_function.function_name + "_cost_iter.txt", 'w+')
    f_handle_cost_eval = open(save_dir + "/FSS_" + objective_function.function_name + "_cost_eval.txt", 'w+')
    runs = []

  for run in range(num_runs):
    opt1 = FSS(objective_function=objective_function, space_initializer=search_space_initializer,
                n_iter=n_iter, school_size=school_size, step_ind_init=step_individual_init,
                step_ind_final=step_individual_final, step_vol_init=step_volitive_init,
                step_vol_final=step_volitive_final, min_weight=min_w, weight_scale=w_scale)
    
    opt1.optimize()
    runs.append(opt1.best_fish.cost)
    print(console_out.format(alg_name, objective_function.function_name, run+1, opt1.best_fish.cost))

    temp_optimum_cost_tracking_iter = np.asmatrix(opt1.optimum_cost_tracking_iter)
    temp_optimum_cost_tracking_eval = np.asmatrix(opt1.optimum_cost_tracking_eval)

    if save_dir:
      np.savetxt(f_handle_cost_iter, temp_optimum_cost_tracking_iter, fmt='%.4e')
      np.savetxt(f_handle_cost_eval, temp_optimum_cost_tracking_eval, fmt='%.4e')

  if save_dir:
    f_handle_cost_iter.close()
    f_handle_cost_eval.close()

  return runs

main()

starting FSS (15)
Algorithm: FSS Function: Sphere Execution: 1 Best Cost: 5.344333318639809e-13
Algorithm: FSS Function: Sphere Execution: 2 Best Cost: 1.091894672174576e-07
Algorithm: FSS Function: Sphere Execution: 3 Best Cost: 1.2397512411836443e-10
Algorithm: FSS Function: Sphere Execution: 4 Best Cost: 6.454431299622177e-10
Algorithm: FSS Function: Sphere Execution: 5 Best Cost: 4.048873840297964e-09
Algorithm: FSS Function: Sphere Execution: 6 Best Cost: 5.303623548478048e-12
Algorithm: FSS Function: Sphere Execution: 7 Best Cost: 4.264002973485806e-08
Algorithm: FSS Function: Sphere Execution: 8 Best Cost: 1.3548994569869407e-11
Algorithm: FSS Function: Sphere Execution: 9 Best Cost: 7.873926619516494e-09
Algorithm: FSS Function: Sphere Execution: 10 Best Cost: 6.328818041919438e-10
Algorithm: FSS Function: Sphere Execution: 11 Best Cost: 4.087725680224146e-09
Algorithm: FSS Function: Sphere Execution: 12 Best Cost: 5.271425077378825e-09
Algorithm: FSS Function: Sphere Execution