diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index e95807f66..95ca5aecc 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -1,4 +1,5 @@ from time import perf_counter +from typing import Callable from sys import platform from casadi import Importer @@ -174,513 +175,19 @@ def generic_get_all_penalties(interface, nlp: NonLinearProgram, penalties, is_un Parameters ---------- + interface: + A reference to the current interface nlp: NonLinearProgram The nonlinear program to parse the penalties from penalties: The penalties to parse + is_unscaled: bool + If the penalty is unscaled or scaled Returns ------- - + TODO """ - def format_target(target_in: np.array) -> np.array: - """ - Format the target of a penalty to a numpy array - - Parameters - ---------- - target_in: np.array - The target of the penalty - Returns - ------- - np.array - The target of the penalty formatted to a numpy array - """ - if len(target_in.shape) == 2: - target_out = target_in[:, penalty.node_idx.index(idx)] - elif len(target_in.shape) == 3: - target_out = target_in[:, :, penalty.node_idx.index(idx)] - else: - raise NotImplementedError("penalty target with dimension != 2 or 3 is not implemented yet") - return target_out - - def get_x_and_u_at_idx(_penalty, _idx, is_unscaled): - """ """ - - def get_control_modificator(index): - return ( - 1 - if ocp.nlp[_penalty.nodes_phase[index]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and ( - _penalty.nodes[index] == Node.END - or _penalty.nodes[index] == ocp.nlp[_penalty.nodes_phase[index]].ns - ) - else 0 - ) - - if _penalty.transition: - ocp = interface.ocp - - u0_mode = get_control_modificator(0) - u1_mode = get_control_modificator(1) - - if is_unscaled: - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0], - 1, - ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0], - fake, - ) - else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0] - - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[0].U - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - ) and ( - _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0], - 1, - ) - _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode], - fake, - ) - else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode] - else: - _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - ) and ( - _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0], - 1, - ) - _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode], - fake, - ) - else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode] - else: - _u_1 = [] - - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0], - 1, - ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]], - fake, - ) - else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0], - 1, - ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]], - fake, - ) - else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]] - - else: - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0], - 1, - ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0], - fake, - ) - else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0] - - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - ) and ( - _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0], - 1, - ) - _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[_penalty.all_nodes_index[0] - u0_mode], - fake, - ) - else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[ - _penalty.all_nodes_index[0] - u0_mode - ] - else: - _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - ) and ( - _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0], - 1, - ) - _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[_penalty.all_nodes_index[1] - u1_mode], - fake, - ) - else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[ - _penalty.all_nodes_index[1] - u1_mode - ] - else: - _u_1 = [] - - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0], - 1, - ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]], - fake, - ) - else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0], - 1, - ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]], - fake, - ) - else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]] - - _x = vertcat(_x_1, _x_0) - _u = vertcat(_u_1, _u_0) - _s = vertcat(_s_1, _s_0) - - elif _penalty.multinode_penalty: - ocp = interface.ocp - - # Make an exception to the fact that U is not available for the last node - _x = ocp.cx() - _u = ocp.cx() - _s = ocp.cx() - for i in range(len(_penalty.nodes_phase)): - nlp_i = ocp.nlp[_penalty.nodes_phase[i]] - index_i = _penalty.multinode_idx[i] - ui_mode = get_control_modificator(i) - - if is_unscaled: - _x_tp = nlp_i.cx() - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: - _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, 0]) - else: - for i in range(nlp_i.X[index_i].shape[1]): - _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, i]) - _u_tp = ( - nlp_i.U[index_i - ui_mode] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U) - else [] - ) - _s_tp = nlp_i.S[index_i] - else: - _x_tp = nlp_i.cx() - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: - _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, 0]) - else: - for i in range(nlp_i.X_scaled[index_i].shape[1]): - _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, i]) - _u_tp = ( - nlp_i.U_scaled[index_i - ui_mode] - if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - or index_i < len(nlp_i.U_scaled) - else [] - ) - _s_tp = nlp_i.S_scaled[index_i] - - _x = vertcat(_x, _x_tp) - _u = vertcat(_u, _u_tp) - _s = vertcat(_s, _s_tp) - - elif _penalty.integrate: - if is_unscaled: - _x = nlp.cx() - for i in range(nlp.X[_idx].shape[1]): - _x = vertcat(_x, nlp.X[_idx][:, i]) - _u = ( - nlp.U[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) - else [] - ) - _s = nlp.S[_idx] - else: - _x = nlp.cx() - for i in range(nlp.X_scaled[_idx].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) - _u = ( - nlp.U_scaled[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U_scaled) - else [] - ) - _s = nlp.S_scaled[_idx] - else: - if is_unscaled: - _x = nlp.cx() - if ( - _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL - or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL - ): - _x = vertcat(_x, nlp.X[_idx][:, 0]) - else: - for i in range(nlp.X[_idx].shape[1]): - _x = vertcat(_x, nlp.X[_idx][:, i]) - - # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that - if ( - _idx == nlp.ns - and nlp.ode_solver.is_direct_collocation - and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and _penalty.node[0] != Node.END - and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL - ): - for i in range(1, nlp.X[_idx - 1].shape[1]): - _x = vertcat(_x, nlp.X[_idx - 1][:, i]) - - _u = ( - nlp.U[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) - else [] - ) - _s = nlp.S[_idx][:, 0] - else: - _x = nlp.cx() - if ( - _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL - or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL - ): - _x = vertcat(_x, nlp.X_scaled[_idx][:, 0]) - else: - for i in range(nlp.X_scaled[_idx].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) - - # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that - if ( - _idx == nlp.ns - and nlp.ode_solver.is_direct_collocation - and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and _penalty.node[0] != Node.END - and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL - ): - for i in range(1, nlp.X_scaled[_idx - 1].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx - 1][:, i]) - - if sum(_penalty.weighted_function[_idx].size_in(1)) == 0: - _u = [] - elif nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx == len(nlp.U_scaled): - _u = nlp.U_scaled[_idx - 1][:, 0] - elif _idx < len(nlp.U_scaled): - _u = nlp.U_scaled[_idx][:, 0] - else: - _u = [] - _s = nlp.S_scaled[_idx][:, 0] - - if _penalty.explicit_derivative: - if _idx < nlp.ns: - if is_unscaled: - x = nlp.X[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U): - u = nlp.U[_idx][:, 0] - elif _idx + 1 < len(nlp.U): - u = nlp.U[_idx + 1][:, 0] - else: - u = [] - s = nlp.S[_idx + 1][:, 0] - else: - x = nlp.X_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U_scaled): - u = nlp.U_scaled[_idx][:, 0] - elif _idx + 1 < len(nlp.U_scaled): - u = nlp.U_scaled[_idx + 1][:, 0] - else: - u = [] - s = nlp.S_scaled[_idx + 1][:, 0] - - _x = vertcat(_x, x) - _u = vertcat(_u, u) - _s = vertcat(_s, s) - - if _penalty.derivative: - if _idx < nlp.ns: - if is_unscaled: - x = nlp.X[_idx + 1][:, 0] - if _idx + 1 == len(nlp.U): - u = nlp.U[_idx][:, 0] - elif _idx + 1 < len(nlp.U): - u = nlp.U[_idx + 1][:, 0] - else: - u = [] - s = nlp.S[_idx + 1][:, 0] - else: - x = nlp.X_scaled[_idx + 1][:, 0] - if _idx + 1 == len(nlp.U_scaled): - u = nlp.U_scaled[_idx][:, 0] - elif _idx + 1 < len(nlp.U_scaled): - u = nlp.U_scaled[_idx + 1][:, 0] - else: - u = [] - s = nlp.S_scaled[_idx + 1][:, 0] - - _x = vertcat(_x, x) - _u = vertcat(_u, u) - _s = vertcat(_s, s) - - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: - if is_unscaled: - x = nlp.X[_idx + 1][:, 0] - s = nlp.S[_idx + 1][:, 0] - else: - x = nlp.X_scaled[_idx + 1][:, 0] - s = nlp.S_scaled[_idx + 1][:, 0] - _x = vertcat(_x, x) - _s = vertcat(_s, s) - if nlp.control_type == ControlType.LINEAR_CONTINUOUS: - if is_unscaled: - u = ( - nlp.U[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) - else [] - ) - else: - u = ( - nlp.U_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) - else [] - ) - _u = vertcat(_u, u) - - elif _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL: - if nlp.control_type == ControlType.LINEAR_CONTINUOUS: - if is_unscaled: - u = ( - nlp.U[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) - else [] - ) - else: - u = ( - nlp.U_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) - else [] - ) - _u = vertcat(_u, u) - return _x, _u, _s - param = interface.ocp.cx(interface.ocp.parameters.cx) out = interface.ocp.cx() for penalty in penalties: @@ -696,7 +203,7 @@ def get_control_modificator(index): u = nlp.cx() s = nlp.cx() for idx in penalty.node_idx: - x_tp, u_tp, s_tp = get_x_and_u_at_idx(penalty, idx, is_unscaled) + x_tp, u_tp, s_tp = get_x_u_s_at_idx(interface, nlp, penalty, idx, is_unscaled) x = horzcat(x, x_tp) u = horzcat(u, u_tp) s = horzcat(s, s_tp) @@ -714,11 +221,11 @@ def get_control_modificator(index): penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL or penalty.integration_rule == QuadratureRule.TRAPEZOIDAL ): - target0 = format_target(penalty.target[0]) - target1 = format_target(penalty.target[1]) + target0 = format_target(penalty, penalty.target[0], idx) + target1 = format_target(penalty, penalty.target[1], idx) target = np.vstack((target0, target1)).T else: - target = format_target(penalty.target[0]) + target = format_target(penalty, penalty.target[0], idx) if np.isnan(np.sum(target)): continue @@ -728,7 +235,7 @@ def get_control_modificator(index): u = [] s = [] else: - x, u, s = get_x_and_u_at_idx(penalty, idx, is_unscaled) + x, u, s = get_x_u_s_at_idx(interface, nlp, penalty, idx, is_unscaled) time = interface.ocp.node_time(phase_idx=0 if nlp == [] else nlp.phase_idx, node_idx=idx) p = vertcat( p, penalty.weighted_function[idx](time, x, u, param, s, penalty.weight, target, penalty.dt) @@ -736,3 +243,443 @@ def get_control_modificator(index): out = vertcat(out, sum2(p)) return out + + +def format_target(penalty, target_in: np.ndarray, idx: int) -> np.ndarray: + """ + Format the target of a penalty to a numpy array + + Parameters + ---------- + penalty: + The penalty with a target + target_in: np.ndarray + The target of the penalty + idx: int + The index of the node + Returns + ------- + np.ndarray + The target of the penalty formatted to a numpy ndarray + """ + if len(target_in.shape) not in [2, 3]: + raise NotImplementedError("penalty target with dimension != 2 or 3 is not implemented yet") + + target_out = target_in[..., penalty.node_idx.index(idx)] + + return target_out + + +def get_control_modificator(ocp, _penalty, index: int): + current_phase = ocp.nlp[_penalty.nodes_phase[index]] + current_node = _penalty.nodes[index] + phase_dynamics = current_phase.phase_dynamics + number_of_shooting_points = current_phase.ns + + is_shared_dynamics = phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_end_or_shooting_point = current_node == Node.END or current_node == number_of_shooting_points + + return 1 if is_shared_dynamics and is_end_or_shooting_point else 0 + + +def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): + """ """ + + if _penalty.transition: + ocp = interface.ocp + cx = interface.ocp.cx + + all_nlp = interface.ocp.nlp + + phase_node0 = _penalty.nodes_phase[0] + phase_node1 = _penalty.nodes_phase[1] + + node_idx_0 = _penalty.all_nodes_index[0] + node_idx_1 = _penalty.all_nodes_index[1] + + u0_mode = get_control_modificator(ocp, _penalty, 0) + u1_mode = get_control_modificator(ocp, _penalty, 1) + + _x_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="X" if is_unscaled else "X_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].X_scaled[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _x_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="X" if is_unscaled else "X_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].X_scaled[node_idx_0].shape[0], + casadi_constructor=cx, + ) + + _s_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="S" if is_unscaled else "S_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].S[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _s_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="S" if is_unscaled else "S_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].S[node_idx_0].shape[0], + casadi_constructor=cx, + ) + + is_shared_dynamics_0, is_node0_within_control_limit, len_u_0 = get_node_control_info( + all_nlp[phase_node0], node_idx_0, attribute="U" if is_unscaled else "U_scaled" + ) + is_shared_dynamics_1, is_node1_within_control_limit, len_u_1 = get_node_control_info( + all_nlp[phase_node1], node_idx_1, attribute="U" if is_unscaled else "U_scaled" + ) + + _u_0 = get_padded_control_array( + all_nlp[phase_node0], + node_idx_0, + attribute="U" if is_unscaled else "U_scaled", + u_mode=u0_mode, + target_length=len_u_1, + is_shared_dynamics_target=is_shared_dynamics_1, + is_within_control_limit_target=is_node1_within_control_limit, + casadi_constructor=cx, + ) + + _u_1 = get_padded_control_array( + all_nlp[phase_node1], + node_idx_1, + attribute="U" if is_unscaled else "U_scaled", + u_mode=u1_mode, + target_length=len_u_0, + is_shared_dynamics_target=is_shared_dynamics_0, + is_within_control_limit_target=is_node0_within_control_limit, + casadi_constructor=cx, + ) + + _x = vertcat(_x_1, _x_0) + _u = vertcat(_u_1, _u_0) + _s = vertcat(_s_1, _s_0) + + elif _penalty.multinode_penalty: + ocp = interface.ocp + + # Make an exception to the fact that U is not available for the last node + _x = ocp.cx() + _u = ocp.cx() + _s = ocp.cx() + for i in range(len(_penalty.nodes_phase)): + nlp_i = ocp.nlp[_penalty.nodes_phase[i]] + index_i = _penalty.multinode_idx[i] + ui_mode = get_control_modificator(ocp, _penalty=_penalty, index=i) + + if is_unscaled: + _x_tp = nlp_i.cx() + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, 0]) + else: + for i in range(nlp_i.X[index_i].shape[1]): + _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, i]) + _u_tp = ( + nlp_i.U[index_i - ui_mode] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U) + else [] + ) + _s_tp = nlp_i.S[index_i] + else: + _x_tp = nlp_i.cx() + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, 0]) + else: + for i in range(nlp_i.X_scaled[index_i].shape[1]): + _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, i]) + _u_tp = ( + nlp_i.U_scaled[index_i - ui_mode] + if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U_scaled) + else [] + ) + _s_tp = nlp_i.S_scaled[index_i] + + _x = vertcat(_x, _x_tp) + _u = vertcat(_u, _u_tp) + _s = vertcat(_s, _s_tp) + + elif _penalty.integrate: + if is_unscaled: + _x = nlp.cx() + for i in range(nlp.X[_idx].shape[1]): + _x = vertcat(_x, nlp.X[_idx][:, i]) + _u = ( + nlp.U[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) + else [] + ) + _s = nlp.S[_idx] + else: + _x = nlp.cx() + for i in range(nlp.X_scaled[_idx].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) + _u = ( + nlp.U_scaled[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U_scaled) + else [] + ) + _s = nlp.S_scaled[_idx] + else: + if is_unscaled: + _x = nlp.cx() + if ( + _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL + or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL + ): + _x = vertcat(_x, nlp.X[_idx][:, 0]) + else: + for i in range(nlp.X[_idx].shape[1]): + _x = vertcat(_x, nlp.X[_idx][:, i]) + + # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that + if ( + _idx == nlp.ns + and nlp.ode_solver.is_direct_collocation + and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + and _penalty.node[0] != Node.END + and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL + ): + for i in range(1, nlp.X[_idx - 1].shape[1]): + _x = vertcat(_x, nlp.X[_idx - 1][:, i]) + + _u = ( + nlp.U[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) + else [] + ) + _s = nlp.S[_idx][:, 0] + else: + _x = nlp.cx() + if ( + _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL + or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL + ): + _x = vertcat(_x, nlp.X_scaled[_idx][:, 0]) + else: + for i in range(nlp.X_scaled[_idx].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) + + # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that + if ( + _idx == nlp.ns + and nlp.ode_solver.is_direct_collocation + and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + and _penalty.node[0] != Node.END + and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL + ): + for i in range(1, nlp.X_scaled[_idx - 1].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx - 1][:, i]) + + if sum(_penalty.weighted_function[_idx].size_in(1)) == 0: + _u = [] + elif nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx == len(nlp.U_scaled): + _u = nlp.U_scaled[_idx - 1][:, 0] + elif _idx < len(nlp.U_scaled): + _u = nlp.U_scaled[_idx][:, 0] + else: + _u = [] + _s = nlp.S_scaled[_idx][:, 0] + + if _penalty.explicit_derivative: + if _idx < nlp.ns: + if is_unscaled: + x = nlp.X[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U): + u = nlp.U[_idx][:, 0] + elif _idx + 1 < len(nlp.U): + u = nlp.U[_idx + 1][:, 0] + else: + u = [] + s = nlp.S[_idx + 1][:, 0] + else: + x = nlp.X_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U_scaled): + u = nlp.U_scaled[_idx][:, 0] + elif _idx + 1 < len(nlp.U_scaled): + u = nlp.U_scaled[_idx + 1][:, 0] + else: + u = [] + s = nlp.S_scaled[_idx + 1][:, 0] + + _x = vertcat(_x, x) + _u = vertcat(_u, u) + _s = vertcat(_s, s) + + if _penalty.derivative: + if _idx < nlp.ns: + if is_unscaled: + x = nlp.X[_idx + 1][:, 0] + if _idx + 1 == len(nlp.U): + u = nlp.U[_idx][:, 0] + elif _idx + 1 < len(nlp.U): + u = nlp.U[_idx + 1][:, 0] + else: + u = [] + s = nlp.S[_idx + 1][:, 0] + else: + x = nlp.X_scaled[_idx + 1][:, 0] + if _idx + 1 == len(nlp.U_scaled): + u = nlp.U_scaled[_idx][:, 0] + elif _idx + 1 < len(nlp.U_scaled): + u = nlp.U_scaled[_idx + 1][:, 0] + else: + u = [] + s = nlp.S_scaled[_idx + 1][:, 0] + + _x = vertcat(_x, x) + _u = vertcat(_u, u) + _s = vertcat(_s, s) + + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + if is_unscaled: + x = nlp.X[_idx + 1][:, 0] + s = nlp.S[_idx + 1][:, 0] + else: + x = nlp.X_scaled[_idx + 1][:, 0] + s = nlp.S_scaled[_idx + 1][:, 0] + _x = vertcat(_x, x) + _s = vertcat(_s, s) + if nlp.control_type == ControlType.LINEAR_CONTINUOUS: + if is_unscaled: + u = ( + nlp.U[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) + else [] + ) + else: + u = ( + nlp.U_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) + else [] + ) + _u = vertcat(_u, u) + + elif _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL: + if nlp.control_type == ControlType.LINEAR_CONTINUOUS: + if is_unscaled: + u = ( + nlp.U[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) + else [] + ) + else: + u = ( + nlp.U_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) + else [] + ) + _u = vertcat(_u, u) + return _x, _u, _s + + +def get_padded_array( + nlp, attribute: str, node_idx: int, casadi_constructor: Callable, target_length: int = None +) -> SX | MX: + """ + Get a padded array of the correct length + + Parameters + ---------- + nlp: NonLinearProgram + The current phase + attribute: str + The attribute to get the array from such as "X", "X_scaled", "U", "U_scaled", "S", "S_scaled" + node_idx: int + The node index in the current phase + target_length: int + The target length of the array, in some cases, one side can be longer than the other one + (e.g. when using uneven transition phase with a different of states between the two phases) + casadi_constructor: Callable + The casadi constructor to use that either build SX or MX + + Returns + ------- + SX | MX + The padded array + """ + padded_array = getattr(nlp, attribute)[node_idx][:, 0] + len_x = padded_array.shape[0] + + if target_length is None: + target_length = len_x + + if target_length > len_x: + fake_padding = casadi_constructor(target_length - len_x, 1) + padded_array = vertcat(padded_array, fake_padding) + + return padded_array + + +def get_node_control_info(nlp, node_idx, attribute: str): + """This returns the information about the control at a given node to format controls properly""" + is_shared_dynamics = nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_within_control_limit = node_idx < len(nlp.U_scaled) + len_u = getattr(nlp, attribute)[0].shape[0] + + return is_shared_dynamics, is_within_control_limit, len_u + + +def get_padded_control_array( + nlp, + node_idx: int, + u_mode: int, + attribute: str, + target_length: int, + is_within_control_limit_target: bool, + is_shared_dynamics_target: bool, + casadi_constructor: Callable, +): + """ + Get a padded array of the correct length + + Parameters + ---------- + nlp: NonLinearProgram + The current phase + node_idx: int + The node index in the current phase + u_mode: int + The control mode see get_control_modificator + attribute: str + The attribute to get the array from such as "X", "X_scaled", "U", "U_scaled", "S", "S_scaled" + target_length: int + The target length of the array, in some cases, one side can be longer than the other one + (e.g. when using uneven transition phase with a different of states between the two phases) + is_within_control_limit_target: bool + If the target node of a given phase is within the control limit + (e.g. when using uneven transition phase with a different of states between the two phases) + is_shared_dynamics_target: bool + If the target node of a given phase is shared during the phase + (e.g. when using uneven transition phase with a different of states between the two phases) + casadi_constructor: Callable + The casadi constructor to use that either build SX or MX + + Returns + ------- + SX | MX + The padded array + """ + + is_shared_dynamics, is_within_control_limit, len_u = get_node_control_info(nlp, node_idx, attribute=attribute) + + _u_sym = [] + + if is_shared_dynamics or is_within_control_limit: + should_apply_fake_padding_on_u_sym = target_length > len_u and ( + is_within_control_limit_target or is_shared_dynamics_target + ) + _u_sym = getattr(nlp, attribute)[node_idx - u_mode] + + if should_apply_fake_padding_on_u_sym: + fake_padding = casadi_constructor(target_length - len_u, 1) + _u_sym = vertcat(_u_sym, fake_padding) + + return _u_sym diff --git a/tests/shard6/test_unit_solver_interface.py b/tests/shard6/test_unit_solver_interface.py new file mode 100644 index 000000000..5656668fd --- /dev/null +++ b/tests/shard6/test_unit_solver_interface.py @@ -0,0 +1,139 @@ +import pytest +from casadi import SX, MX, vertcat, Function +import numpy as np + +from bioptim import NonLinearProgram, PhaseDynamics +from bioptim.interfaces.interface_utils import get_padded_array, get_node_control_info, get_padded_control_array + + +@pytest.fixture +def nlp_sx(): + # Create a dummy NonLinearProgram object with necessary attributes + nlp = NonLinearProgram(None) + nlp.X = [SX([[1], [2], [3]])] + nlp.X_scaled = [SX([[4], [5], [6]])] + # Add more attributes as needed + return nlp + + +@pytest.fixture +def nlp_mx(): + # Create a dummy NonLinearProgram object with necessary attributes + nlp = NonLinearProgram(None) + nlp.X = [MX(np.array([[1], [2], [3]]))] + nlp.X_scaled = [MX(np.array([[4], [5], [6]]))] + # Add more attributes as needed + return nlp + + +def test_valid_input(nlp_sx): + result = get_padded_array(nlp_sx, "X", 0, SX, 5) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (result - expected).is_zero() + + +def test_no_padding(nlp_sx): + result = get_padded_array(nlp_sx, "X", 0, SX) + expected = SX([[1], [2], [3]]) + assert (result - expected).is_zero() + + +def test_custom_target_length(nlp_sx): + result = get_padded_array(nlp_sx, "X", 0, SX, 4) + expected = vertcat(SX([[1], [2], [3]]), SX(1, 1)) + assert (result - expected).is_zero() + + +def test_invalid_attribute(nlp_sx): + with pytest.raises(AttributeError): + get_padded_array(nlp_sx, "invalid_attribute", 0, SX) + + +def test_invalid_node_idx(nlp_sx): + with pytest.raises(IndexError): + get_padded_array(nlp_sx, "X", 10, SX) + + +def test_sx(nlp_sx): + result_sx = get_padded_array(nlp_sx, "X", 0, SX, 5) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (result_sx - expected).is_zero() + + +def test_mx(nlp_mx): + result_mx = get_padded_array(nlp_mx, "X", 0, MX, 5) + expected = vertcat(MX(np.array([[1], [2], [3]])), MX(2, 1)) + res = Function("test", [], [result_mx - expected]) + assert res()["o0"].is_zero() + + +@pytest.fixture +def nlp_control_sx(): + nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE) + nlp.U_scaled = [SX([[1], [2], [3]])] + return nlp + + +@pytest.fixture +def nlp_control_mx(): + nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE) + nlp.U_scaled = [MX(np.array([[1], [2], [3]]))] + return nlp + + +def test_get_node_control_info(nlp_control_sx): + is_shared_dynamics, is_within_control_limit, len_u = get_node_control_info(nlp_control_sx, 0, "U_scaled") + assert is_shared_dynamics + assert is_within_control_limit + assert len_u == 3 + + +def test_get_padded_control_array_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, True, True, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_mx(nlp_control_mx): + _u_sym = get_padded_control_array(nlp_control_mx, 0, 0, "U_scaled", 5, True, True, MX) + expected = vertcat(MX(np.array([[1], [2], [3]])), MX(2, 1)) + res = Function("test", [], [_u_sym - expected]) + assert res()["o0"].is_zero() + + +def test_get_node_control_info_not_shared(nlp_control_sx): + nlp_control_sx.phase_dynamics = PhaseDynamics.ONE_PER_NODE + is_shared_dynamics, _, _ = get_node_control_info(nlp_control_sx, 0, "U_scaled") + assert not is_shared_dynamics + + +def test_get_node_control_info_outside_limit(nlp_control_sx): + is_shared_dynamics, is_within_control_limit, _ = get_node_control_info(nlp_control_sx, 10, "U_scaled") + assert not is_within_control_limit + + +def test_get_padded_control_array_no_padding_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 3, True, True, SX) + expected = SX([[1], [2], [3]]) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_different_u_mode_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 1, 1, "U_scaled", 5, True, True, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_no_shared_dynamics_sx(nlp_control_sx): + nlp_control_sx.phase_dynamics = PhaseDynamics.ONE_PER_NODE + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, True, False, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_outside_control_limit_sx(nlp_control_sx): + with pytest.raises(IndexError): + get_padded_control_array(nlp_control_sx, 10, 0, "U_scaled", 5, False, True, SX) + + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, False, False, SX) + assert (_u_sym - SX([[1], [2], [3]])).is_zero()