/
global_best.py
224 lines (193 loc) · 8.15 KB
/
global_best.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# -*- coding: utf-8 -*-
r"""
A Global-best Particle Swarm Optimization (gbest PSO) algorithm.
It takes a set of candidate solutions, and tries to find the best
solution using a position-velocity update method. Uses a
star-topology where each particle is attracted to the best
performing particle.
The position update can be defined as:
.. math::
x_{i}(t+1) = x_{i}(t) + v_{i}(t+1)
Where the position at the current timestep :math:`t` is updated using
the computed velocity at :math:`t+1`. Furthermore, the velocity update
is defined as:
.. math::
v_{ij}(t + 1) = w * v_{ij}(t) + c_{1}r_{1j}(t)[y_{ij}(t) − x_{ij}(t)]
+ c_{2}r_{2j}(t)[\hat{y}_{j}(t) − x_{ij}(t)]
Here, :math:`c1` and :math:`c2` are the cognitive and social parameters
respectively. They control the particle's behavior given two choices: (1) to
follow its *personal best* or (2) follow the swarm's *global best* position.
Overall, this dictates if the swarm is explorative or exploitative in nature.
In addition, a parameter :math:`w` controls the inertia of the swarm's
movement.
An example usage is as follows:
.. code-block:: python
import pyswarms as ps
from pyswarms.utils.functions import single_obj as fx
# Set-up hyperparameters
options = {'c1': 0.5, 'c2': 0.3, 'w':0.9}
# Call instance of GlobalBestPSO
optimizer = ps.single.GlobalBestPSO(n_particles=10, dimensions=2,
options=options)
# Perform optimization
stats = optimizer.optimize(fx.sphere, iters=100)
This algorithm was adapted from the earlier works of J. Kennedy and
R.C. Eberhart in Particle Swarm Optimization [IJCNN1995]_.
.. [IJCNN1995] J. Kennedy and R.C. Eberhart, "Particle Swarm Optimization,"
Proceedings of the IEEE International Joint Conference on Neural
Networks, 1995, pp. 1942-1948.
"""
# Import standard library
import logging
# Import modules
import numpy as np
import multiprocessing as mp
from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Star
from ..backend.handlers import BoundaryHandler, VelocityHandler
from ..base import SwarmOptimizer
from ..utils.reporter import Reporter
class GlobalBestPSO(SwarmOptimizer):
def __init__(
self,
n_particles,
dimensions,
options,
bounds=None,
bh_strategy="periodic",
velocity_clamp=None,
vh_strategy="unmodified",
center=1.00,
ftol=-np.inf,
init_pos=None,
):
"""Initialize the swarm
Attributes
----------
n_particles : int
number of particles in the swarm.
dimensions : int
number of dimensions in the space.
options : dict with keys :code:`{'c1', 'c2', 'w'}`
a dictionary containing the parameters for the specific
optimization technique.
* c1 : float
cognitive parameter
* c2 : float
social parameter
* w : float
inertia parameter
bounds : tuple of numpy.ndarray, optional
a tuple of size 2 where the first entry is the minimum bound while
the second entry is the maximum bound. Each array must be of shape
:code:`(dimensions,)`.
bh_strategy : str
a strategy for the handling of out-of-bounds particles.
velocity_clamp : tuple, optional
a tuple of size 2 where the first entry is the minimum velocity and
the second entry is the maximum velocity. It sets the limits for
velocity clamping.
vh_strategy : str
a strategy for the handling of the velocity of out-of-bounds particles.
center : list (default is :code:`None`)
an array of size :code:`dimensions`
ftol : float
relative error in objective_func(best_pos) acceptable for
convergence. Default is :code:`-np.inf`
init_pos : numpy.ndarray, optional
option to explicitly set the particles' initial positions. Set to
:code:`None` if you wish to generate the particles randomly.
"""
super(GlobalBestPSO, self).__init__(
n_particles=n_particles,
dimensions=dimensions,
options=options,
bounds=bounds,
velocity_clamp=velocity_clamp,
center=center,
ftol=ftol,
init_pos=init_pos,
)
# Initialize logger
self.rep = Reporter(logger=logging.getLogger(__name__))
# Initialize the resettable attributes
self.reset()
# Initialize the topology
self.top = Star()
self.bh = BoundaryHandler(strategy=bh_strategy)
self.vh = VelocityHandler(strategy=vh_strategy)
self.name = __name__
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
function :code:`f` for a number of iterations :code:`iter.`
Parameters
----------
objective_func : callable
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for the objective function
Returns
-------
tuple
the global best cost and the global best position.
"""
self.rep.log("Obj. func. args: {}".format(kwargs), lvl=logging.DEBUG)
self.rep.log(
"Optimize for {} iters with {}".format(iters, self.options),
lvl=logging.INFO,
)
# Populate memory of the handlers
self.bh.memory = self.swarm.position
self.vh.memory = self.swarm.position
# Setup Pool of processes for parallel evaluation
pool = None if n_processes is None else mp.Pool(n_processes)
self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf)
for i in self.rep.pbar(iters, self.name):
# Compute cost for current position and personal best
# fmt: off
self.swarm.current_cost = compute_objective_function(self.swarm, objective_func, pool=pool, **kwargs)
self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm)
# Set best_cost_yet_found for ftol
best_cost_yet_found = self.swarm.best_cost
self.swarm.best_pos, self.swarm.best_cost = self.top.compute_gbest(self.swarm)
# fmt: on
self.rep.hook(best_cost=self.swarm.best_cost)
# Save to history
hist = self.ToHistory(
best_cost=self.swarm.best_cost,
mean_pbest_cost=np.mean(self.swarm.pbest_cost),
mean_neighbor_cost=self.swarm.best_cost,
position=self.swarm.position,
velocity=self.swarm.velocity,
)
self._populate_history(hist)
# Verify stop criteria based on the relative acceptable cost ftol
relative_measure = self.ftol * (1 + np.abs(best_cost_yet_found))
if (
np.abs(self.swarm.best_cost - best_cost_yet_found)
< relative_measure
):
break
# Perform velocity and position updates
self.swarm.velocity = self.top.compute_velocity(
self.swarm, self.velocity_clamp, self.vh, self.bounds
)
self.swarm.position = self.top.compute_position(
self.swarm, self.bounds, self.bh
)
# Obtain the final best_cost and the final best_position
final_best_cost = self.swarm.best_cost.copy()
final_best_pos = self.swarm.pbest_pos[self.swarm.pbest_cost.argmin()].copy()
# Write report in log and return final cost and position
self.rep.log(
"Optimization finished | best cost: {}, best pos: {}".format(
final_best_cost, final_best_pos
),
lvl=logging.INFO,
)
return (final_best_cost, final_best_pos)