-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
351 additions
and
134 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,126 @@ | ||
""" | ||
Optimization algorithms, i.e. the executive code which finds the | ||
actual optimum. | ||
Optimization (minimization, maximization) can be done by many techniques. | ||
This module consists of algorithms capable of optimizing functions y = f(x). | ||
""" | ||
|
||
import logbook | ||
from concert.base import LimitError | ||
import numpy as np | ||
|
||
|
||
logger = logbook.Logger(__name__) | ||
|
||
|
||
def halve(param, feedback, cmp_set, step, epsilon, max_iterations=100): | ||
def halver(function, x_0, initial_step=None, epsilon=None, | ||
max_iterations=100): | ||
""" | ||
Simple optimizer based on interval halving. Optimize function y = f(x), | ||
where x is obtained from the value of parameter *param*, y is the | ||
result of *feedback*, *cmp_set* is a function for comparing the old | ||
and new values (it also swaps old value for the new one), *epsilon* | ||
is the precision to which we want to optimize and *max_iterations* | ||
limits the number of iterations. | ||
Halving the interval, evaluate *function* based on *param*. Use | ||
*initial_step*, *epsilon* precision and *max_iterations*. | ||
""" | ||
# Safe copy for not changing the original. | ||
if initial_step is None: | ||
step = 1 * x_0.units | ||
else: | ||
step = np.copy(initial_step) * initial_step.units | ||
if epsilon is None: | ||
epsilon = 1e-3 * x_0.units | ||
direction = 1 | ||
i = 0 | ||
data = [] | ||
value = feedback() | ||
data.append((param.get().result(), value)) | ||
last_x = x_0 | ||
|
||
y_0 = function(x_0) | ||
|
||
def turn(direction, step): | ||
return -direction, step / 2.0 | ||
|
||
while i < max_iterations: | ||
try: | ||
param.set(param.get().result() + direction * step).wait() | ||
value = feedback() | ||
point_reached = step < epsilon | ||
def move(x_0, direction, step): | ||
return x_0 + direction * step | ||
|
||
x_0 = move(x_0, direction, step) | ||
|
||
if point_reached: | ||
break | ||
while i < max_iterations: | ||
y_1 = function(x_0) | ||
|
||
if not cmp_set(value): | ||
direction, step = turn(direction, step) | ||
if step < epsilon: | ||
break | ||
|
||
data.append((param.get().result(), value)) | ||
logger.debug("value: %g, parameter value: %s" % | ||
(value, str(param.get().result()))) | ||
except LimitError: | ||
if y_1 >= y_0: | ||
# Worse, change direction and move to the half of the last | ||
# good x and the new x. | ||
direction, step = turn(direction, step) | ||
x_0 = (x_0 + last_x) / 2 | ||
else: | ||
# OK, move forward. | ||
last_x = x_0 | ||
x_0 = move(x_0, direction, step) | ||
|
||
y_0 = y_1 | ||
i += 1 | ||
|
||
return data | ||
return x_0 | ||
|
||
|
||
def quantized(function): | ||
""" | ||
A helper function meant to be used as a decorator to quantize | ||
a *function* which does not take units into account. | ||
""" | ||
def wrapper(eval_func, x_0, *args, **kwargs): | ||
return function(lambda x: eval_func(x * x_0.units), | ||
x_0, *args, **kwargs) | ||
|
||
wrapper.__doc__ = function.__doc__ | ||
|
||
return wrapper | ||
|
||
|
||
@quantized | ||
def down_hill(function, x_0, **kwargs): | ||
""" | ||
Downhill simplex algorithm from :py:func:`scipy.optimize.fmin`. | ||
Please refer to the scipy function for additional arguments information. | ||
""" | ||
from scipy import optimize | ||
|
||
return optimize.fmin(function, x_0, disp=0, **kwargs)[0] * x_0.units | ||
|
||
|
||
@quantized | ||
def powell(function, x_0, **kwargs): | ||
""" | ||
Powell's algorithm from :py:func:`scipy.optimize.fmin_powell`. | ||
Please refer to the scipy function for additional arguments information. | ||
""" | ||
from scipy import optimize | ||
|
||
return optimize.fmin_powell(function, x_0, disp=0, **kwargs) * x_0.units | ||
|
||
|
||
@quantized | ||
def nonlinear_conjugate(function, x_0, **kwargs): | ||
""" | ||
Nonlinear conjugate gradient algorithm from | ||
:py:func:`scipy.optimize.fmin_cg`. | ||
Please refer to the scipy function for additional arguments information. | ||
""" | ||
from scipy import optimize | ||
|
||
return optimize.fmin_cg(function, x_0, disp=0, **kwargs)[0] * x_0.units | ||
|
||
|
||
@quantized | ||
def bfgs(function, x_0, **kwargs): | ||
""" | ||
Broyde-Fletcher-Goldfarb-Shanno (BFGS) algorithm from | ||
:py:func:`scipy.optimize.fmin_bfgs`. | ||
Please refer to the scipy function for additional arguments information. | ||
""" | ||
from scipy import optimize | ||
|
||
return optimize.fmin_bfgs(function, x_0, disp=0, **kwargs)[0] * x_0.units | ||
|
||
|
||
@quantized | ||
def least_squares(function, x_0, **kwargs): | ||
""" | ||
Least squares algorithm from :py:func:`scipy.optimize.leastsq`. | ||
Please refer to the scipy function for additional arguments information. | ||
""" | ||
from scipy import optimize | ||
|
||
return optimize.leastsq(function, x_0, **kwargs)[0][0] * x_0.units |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,58 +1,85 @@ | ||
"""Optimization base class for executing various algorithms.""" | ||
""" | ||
Optimization is a procedure to iteratively find the best possible match | ||
to | ||
.. math:: | ||
y = f(x). | ||
This module provides base classes for optimizer implementations. | ||
""" | ||
|
||
from concert.processes.base import Process | ||
from concert.asynchronous import async, dispatcher | ||
import logbook | ||
import numpy as np | ||
from concert.optimization import algorithms | ||
from concert.base import LimitError | ||
|
||
|
||
class Optimizer(Process): | ||
|
||
""" | ||
Base optimizer class. All necessary parameters are handled by it. | ||
The subclasses then implement their :py:meth:`is_better` methods, | ||
where an old value and new value are compared. | ||
""" | ||
FOUND = "optimum-found" | ||
"""The most abstract optimizer.""" | ||
FINISHED = "optimization-finished" | ||
|
||
def __init__(self, param, feedback, step, algorithm=algorithms.halve, | ||
max_iterations=100, epsilon=0.01): | ||
super(Optimizer, self).__init__([param]) | ||
self.param = param | ||
self.feedback = feedback | ||
self.algorithm = algorithm | ||
self.step = step | ||
self.max_iterations = max_iterations | ||
self.epsilon = epsilon | ||
self.value = None | ||
def __init__(self, function): | ||
""" | ||
Create an optimizer for a *function*. | ||
""" | ||
super(Optimizer, self).__init__(None) | ||
self.data = [] | ||
self.function = function | ||
self._logger = logbook.Logger(__name__ + "." + self.__class__.__name__) | ||
|
||
@async | ||
def run(self): | ||
"""Run the optimization algorithm.""" | ||
# Since step is a quantity, make a safe copy here for not changing | ||
# the original value (we want to reuse it by the next run. | ||
data = self.algorithm(self.param, self.feedback, self.cmp_set, | ||
np.copy(self.step) * self.step.units, | ||
self.epsilon, | ||
self.max_iterations) | ||
def evaluate(self, x): | ||
"""Execute y = f(*x*), save (x, y) pair and return y.""" | ||
y = self.function(x) | ||
self.data.append((x, y)) | ||
|
||
if len(data) < self.max_iterations: | ||
dispatcher.send(self, self.FOUND) | ||
return y | ||
|
||
return data | ||
def _optimize(self): | ||
"""Optimization executive code.""" | ||
raise NotImplementedError | ||
|
||
def cmp_set(self, new_value): | ||
@async | ||
def run(self): | ||
""" | ||
Return if the *new_value* is better than current value. Also set | ||
the object's value to be the *new_value*. | ||
run() | ||
Run the optimizer. | ||
""" | ||
is_better = self.is_better(new_value) | ||
self.value = new_value | ||
# Clear the saved values. | ||
self.data = [] | ||
|
||
return is_better | ||
self._optimize() | ||
|
||
def is_better(self, value): | ||
"""Return if the *value* is better than current value.""" | ||
raise NotImplementedError | ||
dispatcher.send(self, self.FINISHED) | ||
self._logger.debug("Optimization finished with x = {0}, y = {1}". | ||
format(self.data[0][-1], self.data[1][-1])) | ||
|
||
return tuple(self.data) | ||
|
||
|
||
class ParameterOptimizer(Optimizer): | ||
|
||
""" | ||
An optimizer based on a :py:class:`.Parameter` and a callable feedback. | ||
The function to optimize is created as :: | ||
def function(x): | ||
param.set(x).wait() | ||
return feedback() | ||
""" | ||
|
||
def __init__(self, param, feedback): | ||
"""Create an optimizer for parameter *param* and *feedback*.""" | ||
self.param = param | ||
self.feedback = feedback | ||
|
||
def function(x): | ||
try: | ||
self.param.set(x).wait() | ||
except LimitError: | ||
pass | ||
|
||
return self.feedback() | ||
|
||
super(ParameterOptimizer, self).__init__(function) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
from concert.optimization.base import ParameterOptimizer | ||
from concert.base import LimitError | ||
|
||
|
||
class Minimizer(ParameterOptimizer): | ||
|
||
"""Minimizer tries to minimize a function | ||
.. math:: | ||
y = f(x) | ||
.. py:attribute:: algorithm | ||
An algorithm which does the optimization, it is a callable. | ||
.. py:attribute:: alg_args | ||
A tuple of arguments passed to the algorithm | ||
.. py:attribute:: alg_kwargs | ||
A dictionary of keyword arguments passed to the algorithm | ||
The executive optimization function is then:: | ||
algorithm(x_guess, *alg_args, **alg_kwargs) | ||
where *x_guess* is the x value at which the optimizer starts. If | ||
*alg_args* is None, the *x_guess* is derived from the current | ||
parameter value, otherwise *x_guess* must be the first value in | ||
the *alg_args* list. | ||
""" | ||
|
||
def __init__(self, param, feedback, algorithm, alg_args=None, | ||
alg_kwargs=None): | ||
super(Minimizer, self).__init__(param, feedback) | ||
self.algorithm = algorithm | ||
self.alg_args = alg_args | ||
if not self.alg_args: | ||
self.alg_args = (param.get().result(), ) | ||
self.alg_kwargs = alg_kwargs | ||
if not self.alg_kwargs: | ||
self.alg_kwargs = {} | ||
|
||
def _optimize(self): | ||
result = self.algorithm(self.evaluate, *self.alg_args, | ||
**self.alg_kwargs) | ||
try: | ||
self.param.set(result).wait() | ||
except LimitError: | ||
self._logger.debug("Limit reached.") | ||
|
||
|
||
class Maximizer(Minimizer): | ||
|
||
""" | ||
The same as the :py:class:`.Minimizer` but with changed sign of the | ||
feedback, that is, if the function to minimize is | ||
.. math:: | ||
y = f(x) | ||
then the new function to maximize is | ||
.. math:: | ||
y = - f(x). | ||
""" | ||
|
||
def __init__(self, param, feedback, algorithm, alg_args=None, | ||
alg_kwargs=None): | ||
super(Maximizer, self).__init__(param, _change_sgn(feedback), | ||
algorithm, alg_args, alg_kwargs) | ||
|
||
|
||
def _change_sgn(feedback): | ||
""" | ||
Change feedback function sign, i.e. if y = f(x), then after | ||
applying this function y = -f(x). | ||
""" | ||
def wrapper(): | ||
return -feedback() | ||
|
||
return wrapper |
Oops, something went wrong.