Skip to content

Commit

Permalink
Merge 094a4e2 into 01246ff
Browse files Browse the repository at this point in the history
  • Loading branch information
Stéphane Caron committed Jul 26, 2023
2 parents 01246ff + 094a4e2 commit 1008a5f
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 108 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Exception: ``ProblemDefinitionError``
- MPCProblem: setter for the initial state
- MPCProblem: target state trajectory for stage state cost
- MPCQP class to update cost vectors during execution
- Plan: ``first_input`` getter
- Plan: ``is_empty`` property
- Started ``ltv_mpc.live_plots`` submodule
Expand All @@ -21,11 +22,11 @@ All notable changes to this project will be documented in this file.
- Always add state-input inequalities (even when unfeasible)
- Don't assume a ``Problem`` is fully defined in constructor
- Initial and goal states are now keyword arguments of ``MPCProblem``
- Refactor``build_qp`` into a new ``MPCQP`` class
- Rename ``Problem`` to ``MPCProblem``
- Rename ``Solution`` to ``Plan``
- Rename ``stacked_inputs`` to just ``inputs`` in plans
- Rename ``stacked_states`` to just ``states`` in plans
- Rename ``build_qp`` to ``build_mpc_qp``
- Solver keyword argument to ``solve_mpc`` is now mandatory
- Use problem class from ``qpsolvers`` internally
- Warn rather than raise an exception when initial state is unfeasible
Expand Down
4 changes: 2 additions & 2 deletions doc/src/developer-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ Developer notes
Quadratic program
=================

Internally, :func:`.solve_mpc` builds a quadratic program before calling a QP solver. You can retrieve the QP corresponding to the input problem by calling the internal :func:`.build_mpc_qp` function:
Internally, :func:`.solve_mpc` builds a quadratic program before calling a QP solver. You can retrieve the QP corresponding to the input problem by creating an instance of the intermediate :func:`.MPCQP` representation:

.. autofunction:: ltv_mpc.solve_mpc.build_mpc_qp
.. autoclass:: ltv_mpc.mpc_qp.MPCQP
5 changes: 3 additions & 2 deletions ltv_mpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
"""Linear time-variant model predictive control in Python."""

from .mpc_problem import MPCProblem
from .mpc_qp import MPCQP
from .plan import Plan
from .solve_mpc import build_mpc_qp, solve_mpc
from .solve_mpc import solve_mpc

__all__ = [
"MPCProblem",
"MPCQP",
"Plan",
"build_mpc_qp",
"solve_mpc",
]

Expand Down
28 changes: 28 additions & 0 deletions ltv_mpc/mpc_problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,34 @@ def __init__(
if initial_state is not None:
self.update_initial_state(initial_state)

@property
def has_terminal_cost(self) -> bool:
"""Check whether the problem has a terminal cost."""
cost_is_set = (
self.terminal_cost_weight is not None
and self.terminal_cost_weight > 1e-10
)
if cost_is_set and self.goal_state is None:
raise ProblemDefinitionError(
"MPC problem has terminal cost "
"but the goal state is undefined"
)
return cost_is_set

@property
def has_stage_state_cost(self) -> bool:
"""Check whether the problem has a stage state cost."""
cost_is_set = (
self.stage_state_cost_weight is not None
and self.stage_state_cost_weight > 1e-10
)
if cost_is_set and self.target_states is None:
raise ProblemDefinitionError(
"MPC problem has a stage state cost "
"but the reference trajectory is undefined"
)
return cost_is_set

def get_transition_state_matrix(self, k) -> np.ndarray:
"""Get state-transition matrix at a given timestep.
Expand Down
165 changes: 165 additions & 0 deletions ltv_mpc/mpc_qp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Inria
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""MPC problem represented as a quadratic program."""

from logging import warn

import numpy as np
import qpsolvers
from scipy.sparse import csc_matrix

from .exceptions import ProblemDefinitionError
from .mpc_problem import MPCProblem


class MPCQP:
r"""MPC problem represented as a quadratic program.
This class further stores intermediate matrices used to recompute cost and
linear inequality vectors.
"""

G: np.ndarray
P: np.ndarray
Phi: np.ndarray
Psi: np.ndarray
h: np.ndarray
phi_last: np.ndarray
psi_last: np.ndarray
q: np.ndarray

def __init__(self, mpc_problem: MPCProblem, sparse: bool = False) -> None:
"""Create a new QP representation.
Args:
mpc_problem: Model predictive control problem to cast as a QP.
sparse: If set, use sparse matrix representation.
"""
input_dim = mpc_problem.input_dim
state_dim = mpc_problem.state_dim
stacked_input_dim = mpc_problem.input_dim * mpc_problem.nb_timesteps
if mpc_problem.initial_state is None:
raise ProblemDefinitionError("initial state is undefined")
initial_state: np.ndarray = mpc_problem.initial_state

phi = np.eye(state_dim)
psi = np.zeros((state_dim, stacked_input_dim))
G_list, h_list = [], []
phi_list, psi_list = [], []
for k in range(mpc_problem.nb_timesteps):
# Loop invariant: x == psi * U + phi * x_init
phi_list.append(phi)
psi_list.append(psi)
A_k = mpc_problem.get_transition_state_matrix(k)
B_k = mpc_problem.get_transition_input_matrix(k)
C_k = mpc_problem.get_ineq_state_matrix(k)
D_k = mpc_problem.get_ineq_input_matrix(k)
e_k = mpc_problem.get_ineq_vector(k)
G_k = np.zeros((e_k.shape[0], stacked_input_dim))
h_k = (
e_k
if C_k is None
else e_k - np.dot(C_k.dot(phi), initial_state)
)
input_slice = slice(k * input_dim, (k + 1) * input_dim)
if D_k is not None:
# we rely on G == 0 to avoid a slower +=
G_k[:, input_slice] = D_k
if C_k is not None:
G_k += C_k.dot(psi)
if k == 0 and D_k is None and np.any(h_k < 0.0):
# in this case, the initial state constraint is violated and
# cannot be compensated by any input (D_k is None)
warn(
"initial state is unfeasible: "
f"G_0 * x <= h_0 with G_0 == 0 and min(h_0) == {min(h_k)}"
)
G_list.append(G_k)
h_list.append(h_k)
phi = A_k.dot(phi)
psi = A_k.dot(psi)
psi[:, input_slice] = B_k
G: np.ndarray = np.vstack(G_list)
h: np.ndarray = np.hstack(h_list)
Phi = np.vstack(phi_list)
Psi = np.vstack(psi_list)

P: np.ndarray = mpc_problem.stage_input_cost_weight * np.eye(
stacked_input_dim,
)
q: np.ndarray = np.zeros(stacked_input_dim)
if mpc_problem.has_terminal_cost:
if mpc_problem.goal_state is None:
raise ProblemDefinitionError("goal state is undefined")
P += mpc_problem.terminal_cost_weight * np.dot(psi.T, psi)
if mpc_problem.has_stage_state_cost:
if mpc_problem.target_states is None:
raise ProblemDefinitionError(
"reference trajectory is undefined"
)
P += mpc_problem.stage_state_cost_weight * np.dot(Psi.T, Psi)

self.P = csc_matrix(P) if sparse else P
self.G = csc_matrix(G) if sparse else G
self.Phi = Phi
self.Psi = Psi
self.h = h
self.phi_last = phi
self.psi_last = psi
self.q = q # initialized below
#
self.update_cost_vector(mpc_problem)

@property
def problem(self) -> qpsolvers.Problem:
"""Get quadratic program to call a QP solver."""
return qpsolvers.Problem(self.P, self.q, self.G, self.h)

def update_cost_vector(self, mpc_problem: MPCProblem) -> None:
"""Update the gradient vector in the cost function.
Args:
mpc_problem: New model predictive control problem. It should have
the same structure as the one used to initialize the MPCQP.
"""
if mpc_problem.initial_state is None:
raise ProblemDefinitionError("initial state is undefined")
initial_state = mpc_problem.initial_state
goal_state = mpc_problem.goal_state
self.q[:] = 0.0
if mpc_problem.has_terminal_cost:
c = np.dot(self.phi_last, initial_state) - goal_state
self.q += mpc_problem.terminal_cost_weight * np.dot(
c.T, self.psi_last
)
if mpc_problem.has_stage_state_cost:
c = np.dot(self.Phi, initial_state) - mpc_problem.target_states
self.q += mpc_problem.stage_state_cost_weight * np.dot(
c.T, self.Psi
)

def update_constraint_vector(self, mpc_problem: MPCProblem) -> None:
"""Update the inequality constraint vector.
Args:
mpc_problem: New model predictive control problem. It should have
the same structure as the one used to initialize the MPCQP.
"""
raise NotImplementedError(
"Time-varying constraints are handled cold-start for now"
)
103 changes: 3 additions & 100 deletions ltv_mpc/solve_mpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,110 +17,13 @@

"""Solve model predictive control problems."""

from logging import warn

import numpy as np
import qpsolvers
from qpsolvers import solve_problem
from scipy.sparse import csc_matrix

from .exceptions import ProblemDefinitionError
from .mpc_problem import MPCProblem
from .mpc_qp import MPCQP
from .plan import Plan


def build_mpc_qp(
problem: MPCProblem,
sparse: bool = False,
) -> qpsolvers.Problem:
"""Build the quadratic program corresponding to an MPC problem.
Args:
problem: Model predictive control problem.
sparse: Whether to use sparse or dense matrices in the output quadratic
program. Enable it if you are calling a sparse solver afterwards.
Returns:
Quadratic program representing the MPC problem.
Notes:
The QP built by this function implements a `single shooting method
<https://en.wikipedia.org/wiki/Shooting_method>`_.
"""
input_dim = problem.input_dim
state_dim = problem.state_dim
stacked_input_dim = problem.input_dim * problem.nb_timesteps
if problem.initial_state is None:
raise ProblemDefinitionError("initial state is undefined")
initial_state: np.ndarray = problem.initial_state

phi = np.eye(state_dim)
psi = np.zeros((state_dim, stacked_input_dim))
G_list, h_list = [], []
phi_list, psi_list = [], []
for k in range(problem.nb_timesteps):
# Loop invariant: x == psi * U + phi * x_init
if problem.stage_state_cost_weight is not None:
phi_list.append(phi)
psi_list.append(psi)
A_k = problem.get_transition_state_matrix(k)
B_k = problem.get_transition_input_matrix(k)
C_k = problem.get_ineq_state_matrix(k)
D_k = problem.get_ineq_input_matrix(k)
e_k = problem.get_ineq_vector(k)
G_k = np.zeros((e_k.shape[0], stacked_input_dim))
h_k = e_k if C_k is None else e_k - np.dot(C_k.dot(phi), initial_state)
input_slice = slice(k * input_dim, (k + 1) * input_dim)
if D_k is not None:
# we rely on G == 0 to avoid a slower +=
G_k[:, input_slice] = D_k
if C_k is not None:
G_k += C_k.dot(psi)
if k == 0 and D_k is None and np.any(h_k < 0.0):
# in this case, the initial state constraint is violated and cannot
# be compensated by any input (D_k is None)
warn(
"initial state is unfeasible: "
f"G_0 * x <= h_0 with G_0 == 0 and min(h_0) == {min(h_k)}"
)
G_list.append(G_k)
h_list.append(h_k)
phi = A_k.dot(phi)
psi = A_k.dot(psi)
psi[:, input_slice] = B_k

P: np.ndarray = problem.stage_input_cost_weight * np.eye(
stacked_input_dim,
)
q: np.ndarray = np.zeros(stacked_input_dim)
if (
problem.terminal_cost_weight is not None
and problem.terminal_cost_weight > 1e-10
):
if problem.goal_state is None:
raise ProblemDefinitionError("goal state is undefined")
c = np.dot(phi, initial_state) - problem.goal_state
P += problem.terminal_cost_weight * np.dot(psi.T, psi)
q += problem.terminal_cost_weight * np.dot(c.T, psi)
if (
problem.stage_state_cost_weight is not None
and problem.stage_state_cost_weight > 1e-10
):
if problem.target_states is None:
raise ProblemDefinitionError("reference trajectory is undefined")
Phi = np.vstack(phi_list)
Psi = np.vstack(psi_list)
c = np.dot(Phi, initial_state) - problem.target_states
P += problem.stage_state_cost_weight * np.dot(Psi.T, Psi)
q += problem.stage_state_cost_weight * np.dot(c.T, Psi)

G: np.ndarray = np.vstack(G_list)
h: np.ndarray = np.hstack(h_list)
if sparse:
return qpsolvers.Problem(csc_matrix(P), q, csc_matrix(G), h)
return qpsolvers.Problem(P, q, G, h)


def solve_mpc(
problem: MPCProblem,
solver: str,
Expand All @@ -147,6 +50,6 @@ def solve_mpc(
.. _solve_qp:
https://qpsolvers.github.io/qpsolvers/quadratic-programming.html#qpsolvers.solve_qp
"""
qp = build_mpc_qp(problem, sparse=sparse)
qpsol = solve_problem(qp, solver=solver, **kwargs)
mpc_qp = MPCQP(problem, sparse=sparse)
qpsol = solve_problem(mpc_qp.problem, solver=solver, **kwargs)
return Plan(problem, qpsol)
7 changes: 4 additions & 3 deletions tests/test_humanoid_one_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import numpy as np

from ltv_mpc import MPCProblem, solve_mpc
from ltv_mpc.solve_mpc import build_mpc_qp
from ltv_mpc.solve_mpc import MPCQP

gravity = 9.81 # [m] / [s]^2

Expand Down Expand Up @@ -87,8 +87,9 @@ def setUp(self):
self.stepping_problem = problem
self.problem = mpc_problem

def test_build_mpc_qp(self):
qp = build_mpc_qp(self.problem)
def test_mpc_qp_first_constraints(self):
mpc_qp = MPCQP(self.problem)
qp = mpc_qp.problem
self.assertAlmostEqual(np.linalg.norm(qp.G[0:2]), 0.0)

def test_solve_mpc(self):
Expand Down

0 comments on commit 1008a5f

Please sign in to comment.