Skip to content

Commit

Permalink
Add support for parallel particle evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcorreia96 authored and ljvmiranda921 committed Apr 6, 2019
1 parent 52338f1 commit 804e011
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 16 deletions.
30 changes: 30 additions & 0 deletions pyswarms/backend/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from ..utils.reporter import Reporter
from .handlers import BoundaryHandler, VelocityHandler
from functools import partial


rep = Reporter(logger=logging.getLogger(__name__))
Expand Down Expand Up @@ -207,3 +208,32 @@ def compute_position(swarm, bounds, bh):
raise
else:
return position


def compute_objective_function(swarm, objective_func, pool=None, **kwargs):
"""Evaluate particles using the objective function
This method evaluates each particle in the swarm according to the objective function passed.
If a pool is passed, then the evaluation of the particles is done in parallel using multiple processes.
Parameters
----------
swarm : pyswarms.backend.swarms.Swarm
a Swarm instance
objective_func : function
objective function to be evaluated
pool: multiprocessing.Pool
multiprocessing.Pool to be used for parallel particle evaluation
kwargs : dict
arguments for the objective function
Returns
-------
numpy.ndarray
Cost-matrix for the given swarm
"""
if pool is None:
return objective_func(swarm.position, **kwargs)
else:
results = pool.map(partial(objective_func, **kwargs), np.array_split(swarm.position, pool._processes))
return np.concatenate(results)
4 changes: 3 additions & 1 deletion pyswarms/base/base_discrete.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _populate_history(self, hist):
self.velocity_history.append(hist.velocity)

@abc.abstractmethod
def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -142,6 +142,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for objective function
Expand Down
4 changes: 3 additions & 1 deletion pyswarms/base/base_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _populate_history(self, hist):
self.velocity_history.append(hist.velocity)

@abc.abstractmethod
def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -142,6 +142,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for objective function
Expand Down
14 changes: 10 additions & 4 deletions pyswarms/discrete/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@

# Import modules
import numpy as np
import multiprocessing as mp

from ..backend.operators import compute_pbest
from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Ring
from ..backend.handlers import BoundaryHandler, VelocityHandler
from ..base import DiscreteSwarmOptimizer
Expand Down Expand Up @@ -134,7 +135,7 @@ def __init__(
self.vh = VelocityHandler(strategy=vh_strategy)
self.name = __name__

def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -146,6 +147,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for objective function
Expand All @@ -163,11 +166,14 @@ def optimize(self, objective_func, iters, **kwargs):
# Populate memory of the handlers
self.vh.memory = self.swarm.position

# Setup Pool of processes for parallel evaluation
pool = None if n_processes is None else mp.Pool(n_processes)

self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf)
for i in self.rep.pbar(iters, self.name):
# Compute cost for current position and personal best
self.swarm.current_cost = objective_func(
self.swarm.position, **kwargs
self.swarm.current_cost = compute_objective_function(
self.swarm, objective_func, pool, **kwargs
)
self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(
self.swarm
Expand Down
12 changes: 9 additions & 3 deletions pyswarms/single/general_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@

# Import modules
import numpy as np
import multiprocessing as mp

from ..backend.operators import compute_pbest
from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Topology
from ..backend.handlers import BoundaryHandler, VelocityHandler
from ..base import SwarmOptimizer
Expand Down Expand Up @@ -180,7 +181,7 @@ def __init__(
self.vh = VelocityHandler(strategy=vh_strategy)
self.name = __name__

def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -192,6 +193,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for the objective function
Expand All @@ -210,11 +213,14 @@ def optimize(self, objective_func, iters, **kwargs):
self.bh.memory = self.swarm.position
self.vh.memory = self.swarm.position

# Setup Pool of processes for parallel evaluation
pool = None if n_processes is None else mp.Pool(n_processes)

self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf)
for i in self.rep.pbar(iters, self.name):
# Compute cost for current position and personal best
# fmt: off
self.swarm.current_cost = objective_func(self.swarm.position, **kwargs)
self.swarm.current_cost = compute_objective_function(self.swarm, objective_func, pool=pool, **kwargs)
self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm)
best_cost_yet_found = self.swarm.best_cost
# fmt: on
Expand Down
12 changes: 9 additions & 3 deletions pyswarms/single/global_best.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@

# Import modules
import numpy as np
import multiprocessing as mp

from ..backend.operators import compute_pbest
from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Star
from ..backend.handlers import BoundaryHandler, VelocityHandler
from ..base import SwarmOptimizer
Expand Down Expand Up @@ -141,7 +142,7 @@ def __init__(
self.vh = VelocityHandler(strategy=vh_strategy)
self.name = __name__

def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -153,6 +154,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for the objective function
Expand All @@ -171,11 +174,14 @@ def optimize(self, objective_func, iters, **kwargs):
self.bh.memory = self.swarm.position
self.vh.memory = self.swarm.position

# Setup Pool of processes for parallel evaluation
pool = None if n_processes is None else mp.Pool(n_processes)

self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf)
for i in self.rep.pbar(iters, self.name):
# Compute cost for current position and personal best
# fmt: off
self.swarm.current_cost = objective_func(self.swarm.position, **kwargs)
self.swarm.current_cost = compute_objective_function(self.swarm, objective_func, pool=pool, **kwargs)
self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm)
# Set best_cost_yet_found for ftol
best_cost_yet_found = self.swarm.best_cost
Expand Down
14 changes: 10 additions & 4 deletions pyswarms/single/local_best.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@

# Import modules
import numpy as np
import multiprocessing as mp

from ..backend.operators import compute_pbest
from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Ring
from ..backend.handlers import BoundaryHandler, VelocityHandler
from ..base import SwarmOptimizer
Expand Down Expand Up @@ -165,7 +166,7 @@ def __init__(
self.vh = VelocityHandler(strategy=vh_strategy)
self.name = __name__

def optimize(self, objective_func, iters, **kwargs):
def optimize(self, objective_func, iters, n_processes=None, **kwargs):
"""Optimize the swarm for a number of iterations
Performs the optimization to evaluate the objective
Expand All @@ -177,6 +178,8 @@ def optimize(self, objective_func, iters, **kwargs):
objective function to be evaluated
iters : int
number of iterations
n_processes : int
number of processes to use for parallel particle evaluation (default: None = no parallelization)
kwargs : dict
arguments for the objective function
Expand All @@ -195,11 +198,14 @@ def optimize(self, objective_func, iters, **kwargs):
self.bh.memory = self.swarm.position
self.vh.memory = self.swarm.position

# Setup Pool of processes for parallel evaluation
pool = None if n_processes is None else mp.Pool(n_processes)

self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf)
for i in self.rep.pbar(iters, self.name):
# Compute cost for current position and personal best
self.swarm.current_cost = objective_func(
self.swarm.position, **kwargs
self.swarm.current_cost = compute_objective_function(
self.swarm, objective_func, pool=pool, **kwargs
)
self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(
self.swarm
Expand Down
2 changes: 2 additions & 0 deletions pyswarms/utils/decorators/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Import modules
import numpy as np
from functools import wraps


def cost(cost_func):
Expand Down Expand Up @@ -39,6 +40,7 @@ def cost_func(x):
The vectorized output for all particles as defined by :code:`cost_func`
"""

@wraps(cost_func)
def cost_dec(particles, **kwargs):
n_particles = particles.shape[0]
vector = np.array(
Expand Down
7 changes: 7 additions & 0 deletions tests/optimizers/abc_test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def test_ftol_effect(self, options, optimizer):
opt.optimize(sphere, 2000)
assert np.array(opt.cost_history).shape != (2000,)

def test_parallel_evaluation(self, obj_without_args, optimizer, options):
"""Test if parallelization breaks the optimization process"""
import multiprocessing
opt = optimizer(100, 2, options=options)
opt.optimize(obj_without_args, 2000, n_processes=multiprocessing.cpu_count())
assert np.array(opt.cost_history).shape == (2000,)

def test_obj_with_kwargs(self, obj_with_args, optimizer, options):
"""Test if kwargs are passed properly in objfunc"""
x_max = 10 * np.ones(2)
Expand Down
6 changes: 6 additions & 0 deletions tests/optimizers/test_general_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def test_ftol_effect(self, optimizer):
optimizer.optimize(sphere, 2000)
assert np.array(optimizer.cost_history).shape != (2000,)

def test_parallel_evaluation(self, obj_without_args, optimizer):
"""Test if parallelization breaks the optimization process"""
import multiprocessing
optimizer.optimize(obj_without_args, 2000, n_processes=multiprocessing.cpu_count())
assert np.array(optimizer.cost_history).shape == (2000,)

@pytest.mark.skip(reason="Some topologies converge too slowly")
def test_obj_with_kwargs(self, obj_with_args, optimizer):
"""Test if kwargs are passed properly in objfunc"""
Expand Down

0 comments on commit 804e011

Please sign in to comment.