Skip to content

Commit

Permalink
Merge pull request #712 from murrayrm/optimal_18Mar2022
Browse files Browse the repository at this point in the history
Optimal control enhancements
  • Loading branch information
murrayrm committed Mar 30, 2022
2 parents 2f3ca18 + f3d46bc commit cfe21de
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 86 deletions.
3 changes: 3 additions & 0 deletions control/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def reset_defaults():
from .iosys import _iosys_defaults
defaults.update(_iosys_defaults)

from .optimal import _optimal_defaults
defaults.update(_optimal_defaults)


def _get_param(module, param, argval=None, defval=None, pop=False, last=False):
"""Return the default value for a configuration option.
Expand Down
126 changes: 80 additions & 46 deletions control/iosys.py
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ def __init__(self, io_sys, ss_sys=None):
def input_output_response(
sys, T, U=0., X0=0, params={},
transpose=False, return_x=False, squeeze=None,
solve_ivp_kwargs={}, **kwargs):
solve_ivp_kwargs={}, t_eval='T', **kwargs):
"""Compute the output response of a system to a given input.
Simulate a dynamical system with a given input and return its output
Expand Down Expand Up @@ -1654,50 +1654,57 @@ def input_output_response(
if kwargs.get('solve_ivp_method', None):
if kwargs.get('method', None):
raise ValueError("ivp_method specified more than once")
solve_ivp_kwargs['method'] = kwargs['solve_ivp_method']
solve_ivp_kwargs['method'] = kwargs.pop('solve_ivp_method')

# Set the default method to 'RK45'
if solve_ivp_kwargs.get('method', None) is None:
solve_ivp_kwargs['method'] = 'RK45'

# Make sure all input arguments got parsed
if kwargs:
raise TypeError("unknown parameters %s" % kwargs)

# Sanity checking on the input
if not isinstance(sys, InputOutputSystem):
raise TypeError("System of type ", type(sys), " not valid")

# Compute the time interval and number of steps
T0, Tf = T[0], T[-1]
n_steps = len(T)
ntimepts = len(T)

# Figure out simulation times (t_eval)
if solve_ivp_kwargs.get('t_eval'):
if t_eval == 'T':
# Override the default with the solve_ivp keyword
t_eval = solve_ivp_kwargs.pop('t_eval')
else:
raise ValueError("t_eval specified more than once")
if isinstance(t_eval, str) and t_eval == 'T':
# Use the input time points as the output time points
t_eval = T

# Check and convert the input, if needed
# TODO: improve MIMO ninputs check (choose from U)
if sys.ninputs is None or sys.ninputs == 1:
legal_shapes = [(n_steps,), (1, n_steps)]
legal_shapes = [(ntimepts,), (1, ntimepts)]
else:
legal_shapes = [(sys.ninputs, n_steps)]
legal_shapes = [(sys.ninputs, ntimepts)]
U = _check_convert_array(U, legal_shapes,
'Parameter ``U``: ', squeeze=False)
U = U.reshape(-1, n_steps)

# Check to make sure this is not a static function
nstates = _find_size(sys.nstates, X0)
if nstates == 0:
# No states => map input to output
u = U[0] if len(U.shape) == 1 else U[:, 0]
y = np.zeros((np.shape(sys._out(T[0], X0, u))[0], len(T)))
for i in range(len(T)):
u = U[i] if len(U.shape) == 1 else U[:, i]
y[:, i] = sys._out(T[i], [], u)
return TimeResponseData(
T, y, None, U, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)
U = U.reshape(-1, ntimepts)
ninputs = U.shape[0]

# create X0 if not given, test if X0 has correct shape
nstates = _find_size(sys.nstates, X0)
X0 = _check_convert_array(X0, [(nstates,), (nstates, 1)],
'Parameter ``X0``: ', squeeze=True)

# Update the parameter values
sys._update_params(params)
# Figure out the number of outputs
if sys.noutputs is None:
# Evaluate the output function to find number of outputs
noutputs = np.shape(sys._out(T[0], X0, U[:, 0]))[0]
else:
noutputs = sys.noutputs

#
# Define a function to evaluate the input at an arbitrary time
Expand All @@ -1714,6 +1721,31 @@ def ufun(t):
dt = (t - T[idx-1]) / (T[idx] - T[idx-1])
return U[..., idx-1] * (1. - dt) + U[..., idx] * dt

# Check to make sure this is not a static function
if nstates == 0: # No states => map input to output
# Make sure the user gave a time vector for evaluation (or 'T')
if t_eval is None:
# User overrode t_eval with None, but didn't give us the times...
warn("t_eval set to None, but no dynamics; using T instead")
t_eval = T

# Allocate space for the inputs and outputs
u = np.zeros((ninputs, len(t_eval)))
y = np.zeros((noutputs, len(t_eval)))

# Compute the input and output at each point in time
for i, t in enumerate(t_eval):
u[:, i] = ufun(t)
y[:, i] = sys._out(t, [], u[:, i])

return TimeResponseData(
t_eval, y, None, u, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)

# Update the parameter values
sys._update_params(params)

# Create a lambda function for the right hand side
def ivp_rhs(t, x):
return sys._rhs(t, x, ufun(t))
Expand All @@ -1724,27 +1756,27 @@ def ivp_rhs(t, x):
raise NameError("scipy.integrate.solve_ivp not found; "
"use SciPy 1.0 or greater")
soln = sp.integrate.solve_ivp(
ivp_rhs, (T0, Tf), X0, t_eval=T,
ivp_rhs, (T0, Tf), X0, t_eval=t_eval,
vectorized=False, **solve_ivp_kwargs)
if not soln.success:
raise RuntimeError("solve_ivp failed: " + soln.message)

if not soln.success or soln.status != 0:
# Something went wrong
warn("sp.integrate.solve_ivp failed")
print("Return bunch:", soln)

# Compute the output associated with the state (and use sys.out to
# figure out the number of outputs just in case it wasn't specified)
u = U[0] if len(U.shape) == 1 else U[:, 0]
y = np.zeros((np.shape(sys._out(T[0], X0, u))[0], len(T)))
for i in range(len(T)):
u = U[i] if len(U.shape) == 1 else U[:, i]
y[:, i] = sys._out(T[i], soln.y[:, i], u)
# Compute inputs and outputs for each time point
u = np.zeros((ninputs, len(soln.t)))
y = np.zeros((noutputs, len(soln.t)))
for i, t in enumerate(soln.t):
u[:, i] = ufun(t)
y[:, i] = sys._out(t, soln.y[:, i], u[:, i])

elif isdtime(sys):
# If t_eval was not specified, use the sampling time
if t_eval is None:
t_eval = np.arange(T[0], T[1] + sys.dt, sys.dt)

# Make sure the time vector is uniformly spaced
dt = T[1] - T[0]
if not np.allclose(T[1:] - T[:-1], dt):
raise ValueError("Parameter ``T``: time values must be "
dt = t_eval[1] - t_eval[0]
if not np.allclose(t_eval[1:] - t_eval[:-1], dt):
raise ValueError("Parameter ``t_eval``: time values must be "
"equally spaced.")

# Make sure the sample time matches the given time
Expand All @@ -1764,21 +1796,23 @@ def ivp_rhs(t, x):

# Compute the solution
soln = sp.optimize.OptimizeResult()
soln.t = T # Store the time vector directly
x = X0 # Initilize state
soln.t = t_eval # Store the time vector directly
x = np.array(X0) # State vector (store as floats)
soln.y = [] # Solution, following scipy convention
y = [] # System output
for i in range(len(T)):
# Store the current state and output
u, y = [], [] # System input, output
for t in t_eval:
# Store the current input, state, and output
soln.y.append(x)
y.append(sys._out(T[i], x, ufun(T[i])))
u.append(ufun(t))
y.append(sys._out(t, x, u[-1]))

# Update the state for the next iteration
x = sys._rhs(T[i], x, ufun(T[i]))
x = sys._rhs(t, x, u[-1])

# Convert output to numpy arrays
soln.y = np.transpose(np.array(soln.y))
y = np.transpose(np.array(y))
u = np.transpose(np.array(u))

# Mark solution as successful
soln.success = True # No way to fail
Expand All @@ -1787,7 +1821,7 @@ def ivp_rhs(t, x):
raise TypeError("Can't determine system type")

return TimeResponseData(
soln.t, y, soln.y, U, issiso=sys.issiso(),
soln.t, y, soln.y, u, issiso=sys.issiso(),
output_labels=sys.output_index, input_labels=sys.input_index,
state_labels=sys.state_index,
transpose=transpose, return_x=return_x, squeeze=squeeze)
Expand Down
70 changes: 55 additions & 15 deletions control/optimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@
import logging
import time

from . import config
from .exception import ControlNotImplemented
from .timeresp import TimeResponseData

__all__ = ['find_optimal_input']

# Define module default parameter values
_optimal_defaults = {
'optimal.minimize_method': None,
'optimal.minimize_options': {},
'optimal.minimize_kwargs': {},
'optimal.solve_ivp_method': None,
'optimal.solve_ivp_options': {},
}


class OptimalControlProblem():
"""Description of a finite horizon, optimal control problem.
Expand Down Expand Up @@ -110,6 +121,10 @@ class OptimalControlProblem():
values of the input at the specified times (using linear interpolation for
continuous systems).
The default values for ``minimize_method``, ``minimize_options``,
``minimize_kwargs``, ``solve_ivp_method``, and ``solve_ivp_options`` can
be set using config.defaults['optimal.<keyword>'].
"""
def __init__(
self, sys, timepts, integral_cost, trajectory_constraints=[],
Expand All @@ -126,17 +141,22 @@ def __init__(

# Process keyword arguments
self.solve_ivp_kwargs = {}
self.solve_ivp_kwargs['method'] = kwargs.pop('solve_ivp_method', None)
self.solve_ivp_kwargs.update(kwargs.pop('solve_ivp_kwargs', {}))
self.solve_ivp_kwargs['method'] = kwargs.pop(
'solve_ivp_method', config.defaults['optimal.solve_ivp_method'])
self.solve_ivp_kwargs.update(kwargs.pop(
'solve_ivp_kwargs', config.defaults['optimal.solve_ivp_options']))

self.minimize_kwargs = {}
self.minimize_kwargs['method'] = kwargs.pop('minimize_method', None)
self.minimize_kwargs['options'] = kwargs.pop('minimize_options', {})
self.minimize_kwargs.update(kwargs.pop('minimize_kwargs', {}))
self.minimize_kwargs['method'] = kwargs.pop(
'minimize_method', config.defaults['optimal.minimize_method'])
self.minimize_kwargs['options'] = kwargs.pop(
'minimize_options', config.defaults['optimal.minimize_options'])
self.minimize_kwargs.update(kwargs.pop(
'minimize_kwargs', config.defaults['optimal.minimize_kwargs']))

if len(kwargs) > 0:
raise ValueError(
f'unrecognized keyword(s): {list(kwargs.keys())}')
# Make sure all input arguments got parsed
if kwargs:
raise TypeError("unrecognized keyword(s): ", str(kwargs))

# Process trajectory constraints
if isinstance(trajectory_constraints, tuple):
Expand Down Expand Up @@ -271,9 +291,10 @@ def _cost_function(self, coeffs):
logging.debug("initial input[0:3] =\n" + str(inputs[:, 0:3]))

# Simulate the system to get the state
# TODO: try calling solve_ivp directly for better speed?
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand Down Expand Up @@ -393,7 +414,7 @@ def _constraint_function(self, coeffs):
# Simulate the system to get the state
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand Down Expand Up @@ -475,7 +496,7 @@ def _eqconst_function(self, coeffs):
# Simulate the system to get the state
_, _, states = ct.input_output_response(
self.system, self.timepts, inputs, x, return_x=True,
solve_ivp_kwargs=self.solve_ivp_kwargs)
solve_ivp_kwargs=self.solve_ivp_kwargs, t_eval=self.timepts)
self.system_simulations += 1
self.last_x = x
self.last_coeffs = coeffs
Expand Down Expand Up @@ -548,7 +569,7 @@ def _process_initial_guess(self, initial_guess):
initial_guess = np.atleast_1d(initial_guess)

# See whether we got entire guess or just first time point
if len(initial_guess.shape) == 1:
if initial_guess.ndim == 1:
# Broadcast inputs to entire time vector
try:
initial_guess = np.broadcast_to(
Expand Down Expand Up @@ -804,6 +825,15 @@ class OptimalControlResult(sp.optimize.OptimizeResult):
Whether or not the optimizer exited successful.
problem : OptimalControlProblem
Optimal control problem that generated this solution.
cost : float
Final cost of the return solution.
system_simulations, {cost, constraint, eqconst}_evaluations : int
Number of system simulations and evaluations of the cost function,
(inequality) constraint function, and equality constraint function
performed during the optimzation.
{cost, constraint, eqconst}_process_time : float
If logging was enabled, the amount of time spent evaluating the cost
and constraint functions.
"""
def __init__(
Expand Down Expand Up @@ -833,15 +863,19 @@ def __init__(
"unable to solve optimal control problem\n"
"scipy.optimize.minimize returned " + res.message, UserWarning)

# Save the final cost
self.cost = res.fun

# Optionally print summary information
if print_summary:
ocp._print_statistics()
print("* Final cost:", self.cost)

if return_states and inputs.shape[1] == ocp.timepts.shape[0]:
# Simulate the system if we need the state back
_, _, states = ct.input_output_response(
ocp.system, ocp.timepts, inputs, ocp.x, return_x=True,
solve_ivp_kwargs=ocp.solve_ivp_kwargs)
solve_ivp_kwargs=ocp.solve_ivp_kwargs, t_eval=ocp.timepts)
ocp.system_simulations += 1
else:
states = None
Expand All @@ -858,7 +892,7 @@ def __init__(

# Compute the input for a nonlinear, (constrained) optimal control problem
def solve_ocp(
sys, horizon, X0, cost, trajectory_constraints=[], terminal_cost=None,
sys, horizon, X0, cost, trajectory_constraints=None, terminal_cost=None,
terminal_constraints=[], initial_guess=None, basis=None, squeeze=None,
transpose=None, return_states=False, log=False, **kwargs):

Expand Down Expand Up @@ -960,12 +994,18 @@ def solve_ocp(
# Process keyword arguments
if trajectory_constraints is None:
# Backwards compatibility
trajectory_constraints = kwargs.pop('constraints', None)
trajectory_constraints = kwargs.pop('constraints', [])

# Allow 'return_x` as a synonym for 'return_states'
return_states = ct.config._get_param(
'optimal', 'return_x', kwargs, return_states, pop=True)

# Process (legacy) method keyword
if kwargs.get('method'):
if kwargs.get('minimize_method'):
raise ValueError("'minimize_method' specified more than once")
kwargs['minimize_method'] = kwargs.pop('method')

# Set up the optimal control problem
ocp = OptimalControlProblem(
sys, horizon, cost, trajectory_constraints=trajectory_constraints,
Expand Down
2 changes: 1 addition & 1 deletion control/statefbk.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ def dlqr(*args, **kwargs):
The dlqr() function computes the optimal state feedback controller
u[n] = - K x[n] that minimizes the quadratic cost
.. math:: J = \\Sum_0^\\infty (x[n]' Q x[n] + u[n]' R u[n] + 2 x[n]' N u[n])
.. math:: J = \\sum_0^\\infty (x[n]' Q x[n] + u[n]' R u[n] + 2 x[n]' N u[n])
The function can be called with either 3, 4, or 5 arguments:
Expand Down

0 comments on commit cfe21de

Please sign in to comment.