From 75d5bb99993a543dfcb342167a96d50dd294442a Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 16:42:28 -0400 Subject: [PATCH 01/17] removing local functions --- bioptim/interfaces/interface_utils.py | 924 +++++++++++++------------- 1 file changed, 467 insertions(+), 457 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index e95807f66..9010958ed 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -174,565 +174,575 @@ def generic_get_all_penalties(interface, nlp: NonLinearProgram, penalties, is_un Parameters ---------- + interface: + A reference to the current interface nlp: NonLinearProgram The nonlinear program to parse the penalties from penalties: The penalties to parse + is_unscaled: bool + If the penalty is unscaled or scaled Returns ------- - + TODO """ - def format_target(target_in: np.array) -> np.array: - """ - Format the target of a penalty to a numpy array - - Parameters - ---------- - target_in: np.array - The target of the penalty - Returns - ------- - np.array - The target of the penalty formatted to a numpy array - """ - if len(target_in.shape) == 2: - target_out = target_in[:, penalty.node_idx.index(idx)] - elif len(target_in.shape) == 3: - target_out = target_in[:, :, penalty.node_idx.index(idx)] + param = interface.ocp.cx(interface.ocp.parameters.cx) + out = interface.ocp.cx() + for penalty in penalties: + if not penalty: + continue + + if penalty.multi_thread: + if penalty.target is not None and len(penalty.target[0].shape) != 2: + raise NotImplementedError("multi_thread penalty with target shape != [n x m] is not implemented yet") + target = penalty.target[0] if penalty.target is not None else [] + + x = nlp.cx() + u = nlp.cx() + s = nlp.cx() + for idx in penalty.node_idx: + x_tp, u_tp, s_tp = get_x_and_u_at_idx(interface, nlp, penalty, idx, is_unscaled) + x = horzcat(x, x_tp) + u = horzcat(u, u_tp) + s = horzcat(s, s_tp) + + # We can call penalty.weighted_function[0] since multi-thread declares all the node at [0] + time = interface.ocp.node_time(phase_idx=nlp.phase_idx, node_idx=penalty.node_idx[-1]) + p = reshape(penalty.weighted_function[0](time, x, u, param, s, penalty.weight, target, penalty.dt), -1, 1) + else: - raise NotImplementedError("penalty target with dimension != 2 or 3 is not implemented yet") - return target_out - - def get_x_and_u_at_idx(_penalty, _idx, is_unscaled): - """ """ - - def get_control_modificator(index): - return ( - 1 - if ocp.nlp[_penalty.nodes_phase[index]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and ( - _penalty.nodes[index] == Node.END - or _penalty.nodes[index] == ocp.nlp[_penalty.nodes_phase[index]].ns - ) - else 0 - ) + p = interface.ocp.cx() + for idx in penalty.node_idx: + if penalty.target is None: + target = [] + elif ( + penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL + or penalty.integration_rule == QuadratureRule.TRAPEZOIDAL + ): + target0 = format_target(penalty, penalty.target[0], idx) + target1 = format_target(penalty, penalty.target[1], idx) + target = np.vstack((target0, target1)).T + else: + target = format_target(penalty, penalty.target[0], idx) + + if np.isnan(np.sum(target)): + continue - if _penalty.transition: - ocp = interface.ocp + if not nlp: + x = [] + u = [] + s = [] + else: + x, u, s = get_x_and_u_at_idx(interface, nlp, penalty, idx, is_unscaled) + time = interface.ocp.node_time(phase_idx=0 if nlp == [] else nlp.phase_idx, node_idx=idx) + p = vertcat( + p, penalty.weighted_function[idx](time, x, u, param, s, penalty.weight, target, penalty.dt) + ) + + out = vertcat(out, sum2(p)) + return out - u0_mode = get_control_modificator(0) - u1_mode = get_control_modificator(1) - if is_unscaled: - if ( +def format_target(penalty, target_in: np.array, idx) -> np.array: + """ + Format the target of a penalty to a numpy array + + Parameters + ---------- + penalty: + The penalty with a target + target_in: np.array + The target of the penalty + Returns + ------- + np.array + The target of the penalty formatted to a numpy array + """ + if len(target_in.shape) == 2: + target_out = target_in[:, penalty.node_idx.index(idx)] + elif len(target_in.shape) == 3: + target_out = target_in[:, :, penalty.node_idx.index(idx)] + else: + raise NotImplementedError("penalty target with dimension != 2 or 3 is not implemented yet") + return target_out + + +def get_control_modificator(ocp, _penalty, index): + return ( + 1 + if ocp.nlp[_penalty.nodes_phase[index]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + and ( + _penalty.nodes[index] == Node.END + or _penalty.nodes[index] == ocp.nlp[_penalty.nodes_phase[index]].ns + ) + else 0 + ) + + +def get_x_and_u_at_idx(interface, nlp, _penalty, _idx, is_unscaled): + """ """ + + if _penalty.transition: + ocp = interface.ocp + + u0_mode = get_control_modificator(ocp,_penalty, 0) + u1_mode = get_control_modificator(ocp,_penalty, 1) + + if is_unscaled: + if ( interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0] - if ( + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0], + 1, + ) + _x_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0], + fake, + ) + else: + _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0] + if ( interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0], - 1, - ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0], - fake, - ) - else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0], + 1, + ) + _x_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0], + fake, + ) + else: + _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0] - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[0].U - ): - if ( + if interface.ocp.nlp[ + _penalty.nodes_phase[0] + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( + interface.ocp.nlp[0].U + ): + if ( interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - ) and ( + ) and ( _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0], - 1, - ) - _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode], - fake, - ) - else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode] - else: - _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - ) and ( - _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0], - 1, - ) - _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode], - fake, - ) - else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode] - else: - _u_1 = [] - - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0], + interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0], 1, ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]], + _u_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode], fake, ) else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]] + _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode] + else: + _u_0 = [] + if interface.ocp.nlp[ + _penalty.nodes_phase[1] + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( + interface.ocp.nlp[_penalty.nodes_phase[1]].U + ): if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] + interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] + > interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] + ) and ( + _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) + or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics + == PhaseDynamics.SHARED_DURING_THE_PHASE ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0], + interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0], 1, ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]], + _u_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode], fake, ) else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]] + _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode] + else: + _u_1 = [] + if ( + interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] + > interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0], + 1, + ) + _s_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]], + fake, + ) else: - if ( + _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]] + if ( + interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] + > interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0], + 1, + ) + _s_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]], + fake, + ) + else: + _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]] + + else: + if ( interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0] - if ( + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0], + 1, + ) + _x_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0], + fake, + ) + else: + _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0] + if ( interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0], + 1, + ) + _x_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0], + fake, + ) + else: + _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0] + + if interface.ocp.nlp[ + _penalty.nodes_phase[0] + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( + interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled + ): + if ( + interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] + > interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] + ) and ( + _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) + or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics + == PhaseDynamics.SHARED_DURING_THE_PHASE ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0], + interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0], 1, ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0], + _u_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[_penalty.all_nodes_index[0] - u0_mode], fake, ) else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0] - - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - ) and ( - _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0], - 1, - ) - _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[_penalty.all_nodes_index[0] - u0_mode], - fake, - ) - else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[ - _penalty.all_nodes_index[0] - u0_mode + _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[ + _penalty.all_nodes_index[0] - u0_mode ] - else: - _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled - ): - if ( + else: + _u_0 = [] + if interface.ocp.nlp[ + _penalty.nodes_phase[1] + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( + interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled + ): + if ( interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - ) and ( + ) and ( _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0], - 1, - ) - _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[_penalty.all_nodes_index[1] - u1_mode], - fake, - ) - else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[ - _penalty.all_nodes_index[1] - u1_mode - ] - else: - _u_1 = [] - - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0], + interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0], 1, ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]], + _u_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[_penalty.all_nodes_index[1] - u1_mode], fake, ) else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]] - if ( + _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[ + _penalty.all_nodes_index[1] - u1_mode + ] + else: + _u_1 = [] + + if ( + interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] + > interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0], + 1, + ) + _s_0 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]], + fake, + ) + else: + _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]] + if ( interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] > interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0], - 1, - ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]], - fake, - ) - else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]] - - _x = vertcat(_x_1, _x_0) - _u = vertcat(_u_1, _u_0) - _s = vertcat(_s_1, _s_0) - - elif _penalty.multinode_penalty: - ocp = interface.ocp - - # Make an exception to the fact that U is not available for the last node - _x = ocp.cx() - _u = ocp.cx() - _s = ocp.cx() - for i in range(len(_penalty.nodes_phase)): - nlp_i = ocp.nlp[_penalty.nodes_phase[i]] - index_i = _penalty.multinode_idx[i] - ui_mode = get_control_modificator(i) - - if is_unscaled: - _x_tp = nlp_i.cx() - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: - _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, 0]) - else: - for i in range(nlp_i.X[index_i].shape[1]): - _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, i]) - _u_tp = ( - nlp_i.U[index_i - ui_mode] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U) - else [] - ) - _s_tp = nlp_i.S[index_i] - else: - _x_tp = nlp_i.cx() - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: - _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, 0]) - else: - for i in range(nlp_i.X_scaled[index_i].shape[1]): - _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, i]) - _u_tp = ( - nlp_i.U_scaled[index_i - ui_mode] - if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - or index_i < len(nlp_i.U_scaled) - else [] - ) - _s_tp = nlp_i.S_scaled[index_i] + ): + fake = interface.ocp.cx( + interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] + - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0], + 1, + ) + _s_1 = vertcat( + interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]], + fake, + ) + else: + _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]] - _x = vertcat(_x, _x_tp) - _u = vertcat(_u, _u_tp) - _s = vertcat(_s, _s_tp) + _x = vertcat(_x_1, _x_0) + _u = vertcat(_u_1, _u_0) + _s = vertcat(_s_1, _s_0) + + elif _penalty.multinode_penalty: + ocp = interface.ocp + + # Make an exception to the fact that U is not available for the last node + _x = ocp.cx() + _u = ocp.cx() + _s = ocp.cx() + for i in range(len(_penalty.nodes_phase)): + nlp_i = ocp.nlp[_penalty.nodes_phase[i]] + index_i = _penalty.multinode_idx[i] + ui_mode = get_control_modificator(ocp, _penalty=_penalty,index=i) - elif _penalty.integrate: if is_unscaled: - _x = nlp.cx() - for i in range(nlp.X[_idx].shape[1]): - _x = vertcat(_x, nlp.X[_idx][:, i]) - _u = ( - nlp.U[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) + _x_tp = nlp_i.cx() + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, 0]) + else: + for i in range(nlp_i.X[index_i].shape[1]): + _x_tp = vertcat(_x_tp, nlp_i.X[index_i][:, i]) + _u_tp = ( + nlp_i.U[index_i - ui_mode] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U) else [] ) - _s = nlp.S[_idx] + _s_tp = nlp_i.S[index_i] else: - _x = nlp.cx() - for i in range(nlp.X_scaled[_idx].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) - _u = ( - nlp.U_scaled[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U_scaled) + _x_tp = nlp_i.cx() + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, 0]) + else: + for i in range(nlp_i.X_scaled[index_i].shape[1]): + _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, i]) + _u_tp = ( + nlp_i.U_scaled[index_i - ui_mode] + if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + or index_i < len(nlp_i.U_scaled) else [] ) - _s = nlp.S_scaled[_idx] + _s_tp = nlp_i.S_scaled[index_i] + + _x = vertcat(_x, _x_tp) + _u = vertcat(_u, _u_tp) + _s = vertcat(_s, _s_tp) + + elif _penalty.integrate: + if is_unscaled: + _x = nlp.cx() + for i in range(nlp.X[_idx].shape[1]): + _x = vertcat(_x, nlp.X[_idx][:, i]) + _u = ( + nlp.U[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) + else [] + ) + _s = nlp.S[_idx] else: - if is_unscaled: - _x = nlp.cx() - if ( + _x = nlp.cx() + for i in range(nlp.X_scaled[_idx].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) + _u = ( + nlp.U_scaled[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U_scaled) + else [] + ) + _s = nlp.S_scaled[_idx] + else: + if is_unscaled: + _x = nlp.cx() + if ( _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL - ): - _x = vertcat(_x, nlp.X[_idx][:, 0]) - else: - for i in range(nlp.X[_idx].shape[1]): - _x = vertcat(_x, nlp.X[_idx][:, i]) + ): + _x = vertcat(_x, nlp.X[_idx][:, 0]) + else: + for i in range(nlp.X[_idx].shape[1]): + _x = vertcat(_x, nlp.X[_idx][:, i]) - # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that - if ( + # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that + if ( _idx == nlp.ns and nlp.ode_solver.is_direct_collocation and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _penalty.node[0] != Node.END and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL - ): - for i in range(1, nlp.X[_idx - 1].shape[1]): - _x = vertcat(_x, nlp.X[_idx - 1][:, i]) - - _u = ( - nlp.U[_idx][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) - else [] - ) - _s = nlp.S[_idx][:, 0] - else: - _x = nlp.cx() - if ( + ): + for i in range(1, nlp.X[_idx - 1].shape[1]): + _x = vertcat(_x, nlp.X[_idx - 1][:, i]) + + _u = ( + nlp.U[_idx][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx < len(nlp.U) + else [] + ) + _s = nlp.S[_idx][:, 0] + else: + _x = nlp.cx() + if ( _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL - ): - _x = vertcat(_x, nlp.X_scaled[_idx][:, 0]) - else: - for i in range(nlp.X_scaled[_idx].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) + ): + _x = vertcat(_x, nlp.X_scaled[_idx][:, 0]) + else: + for i in range(nlp.X_scaled[_idx].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx][:, i]) - # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that - if ( + # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that + if ( _idx == nlp.ns and nlp.ode_solver.is_direct_collocation and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _penalty.node[0] != Node.END and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL - ): - for i in range(1, nlp.X_scaled[_idx - 1].shape[1]): - _x = vertcat(_x, nlp.X_scaled[_idx - 1][:, i]) - - if sum(_penalty.weighted_function[_idx].size_in(1)) == 0: - _u = [] - elif nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx == len(nlp.U_scaled): - _u = nlp.U_scaled[_idx - 1][:, 0] - elif _idx < len(nlp.U_scaled): - _u = nlp.U_scaled[_idx][:, 0] - else: - _u = [] - _s = nlp.S_scaled[_idx][:, 0] - - if _penalty.explicit_derivative: - if _idx < nlp.ns: - if is_unscaled: - x = nlp.X[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U): - u = nlp.U[_idx][:, 0] - elif _idx + 1 < len(nlp.U): - u = nlp.U[_idx + 1][:, 0] - else: - u = [] - s = nlp.S[_idx + 1][:, 0] - else: - x = nlp.X_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U_scaled): - u = nlp.U_scaled[_idx][:, 0] - elif _idx + 1 < len(nlp.U_scaled): - u = nlp.U_scaled[_idx + 1][:, 0] - else: - u = [] - s = nlp.S_scaled[_idx + 1][:, 0] - - _x = vertcat(_x, x) - _u = vertcat(_u, u) - _s = vertcat(_s, s) - - if _penalty.derivative: - if _idx < nlp.ns: - if is_unscaled: - x = nlp.X[_idx + 1][:, 0] - if _idx + 1 == len(nlp.U): - u = nlp.U[_idx][:, 0] - elif _idx + 1 < len(nlp.U): - u = nlp.U[_idx + 1][:, 0] - else: - u = [] - s = nlp.S[_idx + 1][:, 0] - else: - x = nlp.X_scaled[_idx + 1][:, 0] - if _idx + 1 == len(nlp.U_scaled): - u = nlp.U_scaled[_idx][:, 0] - elif _idx + 1 < len(nlp.U_scaled): - u = nlp.U_scaled[_idx + 1][:, 0] - else: - u = [] - s = nlp.S_scaled[_idx + 1][:, 0] - - _x = vertcat(_x, x) - _u = vertcat(_u, u) - _s = vertcat(_s, s) - - if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + ): + for i in range(1, nlp.X_scaled[_idx - 1].shape[1]): + _x = vertcat(_x, nlp.X_scaled[_idx - 1][:, i]) + + if sum(_penalty.weighted_function[_idx].size_in(1)) == 0: + _u = [] + elif nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx == len(nlp.U_scaled): + _u = nlp.U_scaled[_idx - 1][:, 0] + elif _idx < len(nlp.U_scaled): + _u = nlp.U_scaled[_idx][:, 0] + else: + _u = [] + _s = nlp.S_scaled[_idx][:, 0] + + if _penalty.explicit_derivative: + if _idx < nlp.ns: if is_unscaled: x = nlp.X[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U): + u = nlp.U[_idx][:, 0] + elif _idx + 1 < len(nlp.U): + u = nlp.U[_idx + 1][:, 0] + else: + u = [] s = nlp.S[_idx + 1][:, 0] else: x = nlp.X_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE and _idx + 1 == len(nlp.U_scaled): + u = nlp.U_scaled[_idx][:, 0] + elif _idx + 1 < len(nlp.U_scaled): + u = nlp.U_scaled[_idx + 1][:, 0] + else: + u = [] s = nlp.S_scaled[_idx + 1][:, 0] + _x = vertcat(_x, x) + _u = vertcat(_u, u) _s = vertcat(_s, s) - if nlp.control_type == ControlType.LINEAR_CONTINUOUS: - if is_unscaled: - u = ( - nlp.U[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) - else [] - ) + + if _penalty.derivative: + if _idx < nlp.ns: + if is_unscaled: + x = nlp.X[_idx + 1][:, 0] + if _idx + 1 == len(nlp.U): + u = nlp.U[_idx][:, 0] + elif _idx + 1 < len(nlp.U): + u = nlp.U[_idx + 1][:, 0] else: - u = ( - nlp.U_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) - else [] - ) - _u = vertcat(_u, u) - - elif _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL: - if nlp.control_type == ControlType.LINEAR_CONTINUOUS: - if is_unscaled: - u = ( - nlp.U[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) - else [] - ) + u = [] + s = nlp.S[_idx + 1][:, 0] + else: + x = nlp.X_scaled[_idx + 1][:, 0] + if _idx + 1 == len(nlp.U_scaled): + u = nlp.U_scaled[_idx][:, 0] + elif _idx + 1 < len(nlp.U_scaled): + u = nlp.U_scaled[_idx + 1][:, 0] else: - u = ( - nlp.U_scaled[_idx + 1][:, 0] - if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) - else [] - ) - _u = vertcat(_u, u) - return _x, _u, _s - - param = interface.ocp.cx(interface.ocp.parameters.cx) - out = interface.ocp.cx() - for penalty in penalties: - if not penalty: - continue - - if penalty.multi_thread: - if penalty.target is not None and len(penalty.target[0].shape) != 2: - raise NotImplementedError("multi_thread penalty with target shape != [n x m] is not implemented yet") - target = penalty.target[0] if penalty.target is not None else [] - - x = nlp.cx() - u = nlp.cx() - s = nlp.cx() - for idx in penalty.node_idx: - x_tp, u_tp, s_tp = get_x_and_u_at_idx(penalty, idx, is_unscaled) - x = horzcat(x, x_tp) - u = horzcat(u, u_tp) - s = horzcat(s, s_tp) + u = [] + s = nlp.S_scaled[_idx + 1][:, 0] - # We can call penalty.weighted_function[0] since multi-thread declares all the node at [0] - time = interface.ocp.node_time(phase_idx=nlp.phase_idx, node_idx=penalty.node_idx[-1]) - p = reshape(penalty.weighted_function[0](time, x, u, param, s, penalty.weight, target, penalty.dt), -1, 1) + _x = vertcat(_x, x) + _u = vertcat(_u, u) + _s = vertcat(_s, s) + if _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL: + if is_unscaled: + x = nlp.X[_idx + 1][:, 0] + s = nlp.S[_idx + 1][:, 0] else: - p = interface.ocp.cx() - for idx in penalty.node_idx: - if penalty.target is None: - target = [] - elif ( - penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL - or penalty.integration_rule == QuadratureRule.TRAPEZOIDAL - ): - target0 = format_target(penalty.target[0]) - target1 = format_target(penalty.target[1]) - target = np.vstack((target0, target1)).T - else: - target = format_target(penalty.target[0]) - - if np.isnan(np.sum(target)): - continue + x = nlp.X_scaled[_idx + 1][:, 0] + s = nlp.S_scaled[_idx + 1][:, 0] + _x = vertcat(_x, x) + _s = vertcat(_s, s) + if nlp.control_type == ControlType.LINEAR_CONTINUOUS: + if is_unscaled: + u = ( + nlp.U[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) + else [] + ) + else: + u = ( + nlp.U_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) + else [] + ) + _u = vertcat(_u, u) - if not nlp: - x = [] - u = [] - s = [] - else: - x, u, s = get_x_and_u_at_idx(penalty, idx, is_unscaled) - time = interface.ocp.node_time(phase_idx=0 if nlp == [] else nlp.phase_idx, node_idx=idx) - p = vertcat( - p, penalty.weighted_function[idx](time, x, u, param, s, penalty.weight, target, penalty.dt) - ) + elif _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL: + if nlp.control_type == ControlType.LINEAR_CONTINUOUS: + if is_unscaled: + u = ( + nlp.U[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U) + else [] + ) + else: + u = ( + nlp.U_scaled[_idx + 1][:, 0] + if nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _idx + 1 < len(nlp.U_scaled) + else [] + ) + _u = vertcat(_u, u) + return _x, _u, _s - out = vertcat(out, sum2(p)) - return out From b8b6b1780b48f0247b07e952881010706fcc15f4 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 16:46:55 -0400 Subject: [PATCH 02/17] more sens in this function --- bioptim/interfaces/interface_utils.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 9010958ed..91957ffa4 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -202,7 +202,7 @@ def generic_get_all_penalties(interface, nlp: NonLinearProgram, penalties, is_un u = nlp.cx() s = nlp.cx() for idx in penalty.node_idx: - x_tp, u_tp, s_tp = get_x_and_u_at_idx(interface, nlp, penalty, idx, is_unscaled) + x_tp, u_tp, s_tp = get_x_u_s_at_idx(interface, nlp, penalty, idx, is_unscaled) x = horzcat(x, x_tp) u = horzcat(u, u_tp) s = horzcat(s, s_tp) @@ -234,7 +234,7 @@ def generic_get_all_penalties(interface, nlp: NonLinearProgram, penalties, is_un u = [] s = [] else: - x, u, s = get_x_and_u_at_idx(interface, nlp, penalty, idx, is_unscaled) + x, u, s = get_x_u_s_at_idx(interface, nlp, penalty, idx, is_unscaled) time = interface.ocp.node_time(phase_idx=0 if nlp == [] else nlp.phase_idx, node_idx=idx) p = vertcat( p, penalty.weighted_function[idx](time, x, u, param, s, penalty.weight, target, penalty.dt) @@ -280,8 +280,9 @@ def get_control_modificator(ocp, _penalty, index): ) -def get_x_and_u_at_idx(interface, nlp, _penalty, _idx, is_unscaled): - """ """ +def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): + """ + """ if _penalty.transition: ocp = interface.ocp From d553fa1f154699c83593cf04067af87422fc3e24 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 17:22:16 -0400 Subject: [PATCH 03/17] some doc and little refactors --- bioptim/interfaces/interface_utils.py | 44 ++++++++++++++------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 91957ffa4..940f1954f 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -244,7 +244,7 @@ def generic_get_all_penalties(interface, nlp: NonLinearProgram, penalties, is_un return out -def format_target(penalty, target_in: np.array, idx) -> np.array: +def format_target(penalty, target_in: np.ndarray, idx: int) -> np.ndarray: """ Format the target of a penalty to a numpy array @@ -252,32 +252,34 @@ def format_target(penalty, target_in: np.array, idx) -> np.array: ---------- penalty: The penalty with a target - target_in: np.array + target_in: np.ndarray The target of the penalty + idx: int + The index of the node Returns ------- - np.array - The target of the penalty formatted to a numpy array + np.ndarray + The target of the penalty formatted to a numpy ndarray """ - if len(target_in.shape) == 2: - target_out = target_in[:, penalty.node_idx.index(idx)] - elif len(target_in.shape) == 3: - target_out = target_in[:, :, penalty.node_idx.index(idx)] - else: + if len(target_in.shape) not in [2, 3]: raise NotImplementedError("penalty target with dimension != 2 or 3 is not implemented yet") + + target_out = target_in[..., penalty.node_idx.index(idx)] + return target_out -def get_control_modificator(ocp, _penalty, index): - return ( - 1 - if ocp.nlp[_penalty.nodes_phase[index]].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and ( - _penalty.nodes[index] == Node.END - or _penalty.nodes[index] == ocp.nlp[_penalty.nodes_phase[index]].ns - ) - else 0 - ) +def get_control_modificator(ocp, _penalty, index: int): + + current_phase = ocp.nlp[_penalty.nodes_phase[index]] + current_node = _penalty.nodes[index] + phase_dynamics = current_phase.phase_dynamics + number_of_shooting_points = current_phase.ns + + is_shared_dynamics = phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_end_or_shooting_point = current_node == Node.END or current_node == number_of_shooting_points + + return 1 if is_shared_dynamics and is_end_or_shooting_point else 0 def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): @@ -287,8 +289,8 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): if _penalty.transition: ocp = interface.ocp - u0_mode = get_control_modificator(ocp,_penalty, 0) - u1_mode = get_control_modificator(ocp,_penalty, 1) + u0_mode = get_control_modificator(ocp, _penalty, 0) + u1_mode = get_control_modificator(ocp, _penalty, 1) if is_unscaled: if ( From c03a6a17fd18feed566c4630a07d09a6b48af721 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 19:49:18 -0400 Subject: [PATCH 04/17] penalty transition content for the better --- bioptim/interfaces/interface_utils.py | 304 ++++++++++---------------- 1 file changed, 110 insertions(+), 194 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 940f1954f..4726b6665 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -287,246 +287,163 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): """ if _penalty.transition: + ocp = interface.ocp u0_mode = get_control_modificator(ocp, _penalty, 0) u1_mode = get_control_modificator(ocp, _penalty, 1) + all_nlp = interface.ocp.nlp + + phase_node0 = _penalty.nodes_phase[0] + phase_node1 = _penalty.nodes_phase[1] + + node_idx_0 = _penalty.all_nodes_index[0] + node_idx_1 = _penalty.all_nodes_index[1] + if is_unscaled: - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X[_penalty.all_nodes_index[0]][:, 0] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X[0][:, 0].shape[0], - 1, - ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0], - fake, - ) - else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X[_penalty.all_nodes_index[1]][:, 0] + _x_0 = all_nlp[phase_node0].X[node_idx_0][:, 0] + _x_1 = all_nlp[phase_node1].X[node_idx_1][:, 0] - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[0].U - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - ) and ( - _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0], - 1, - ) - _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode], - fake, - ) + len_x_0 = _x_0.shape[0] + len_x_1 = _x_1.shape[0] + + if len_x_1 > len_x_0: + fake_padding = interface.ocp.cx(len_x_1 - len_x_0, 1) + _x_0 = vertcat(_x_0, fake_padding) + + if len_x_0 > len_x_1: + fake_padding = interface.ocp.cx(len_x_0 - len_x_1, 1) + _x_1 = vertcat(_x_1, fake_padding) + + is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U) + is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U) + + u_0 = all_nlp[phase_node0].U[node_idx_0 - u0_mode] + u_1 = all_nlp[phase_node0].U[node_idx_1 - u1_mode] + + len_u_0 = u_0.shape[0] + len_u_1 = u_1.shape[0] + + if is_shared_dynamics_0 or is_node0_within_control_limit: + should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and (is_node1_within_control_limit or is_shared_dynamics_1) + if should_apply_fake_padding_on_u0: + fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) + _u_0 = vertcat(u_0, fake_padding) else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U[_penalty.all_nodes_index[0] - u0_mode] + _u_0 = u_0 else: _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U - ): - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0] - ) and ( - _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U[0].shape[0], - 1, - ) - _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode], - fake, - ) + + if is_shared_dynamics_1 or is_node1_within_control_limit: + should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and (is_node0_within_control_limit or is_shared_dynamics_0) + if should_apply_fake_padding_on_u1: + fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) + _u_1 = vertcat(u_1, fake_padding) else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U[_penalty.all_nodes_index[1] - u1_mode] + _u_1 = u_1 else: _u_1 = [] - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0], - 1, - ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]], - fake, - ) - else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S[_penalty.all_nodes_index[0]] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S[0].shape[0], - 1, - ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]], - fake, - ) - else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S[_penalty.all_nodes_index[1]] + _s_0 = all_nlp[phase_node0].S[node_idx_0] + _s_1 = all_nlp[phase_node1].S[node_idx_1] + + len_s_0 = _s_0.shape[0] + len_s_1 = _s_1.shape[0] + + if len_s_1 > len_s_0: + fake_padding = interface.ocp.cx(len_s_1 - len_s_0, 1) + _s_0 = vertcat(_s_0, fake_padding) + + if len_s_0 > len_s_1: + fake_padding = interface.ocp.cx(len_s_0 - len_s_1, 1) + _s_1 = vertcat(_s_1, fake_padding) else: - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0], - 1, - ) - _x_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0], - fake, - ) - else: - _x_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[_penalty.all_nodes_index[0]][:, 0] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].X_scaled[0][:, 0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[0][:, 0].shape[0], - 1, - ) - _x_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0], - fake, - ) - else: - _x_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].X_scaled[_penalty.all_nodes_index[1]][:, 0] + _x_0 = all_nlp[phase_node0].X_scaled[node_idx_0][:, 0] + _x_1 = all_nlp[phase_node1].X_scaled[node_idx_1][:, 0] + + len_x_0 = _x_0.shape[0] + len_x_1 = _x_1.shape[0] + + if len_x_1 > len_x_0: + fake_padding = interface.ocp.cx(len_x_1 - len_x_0, 1) + _x_0 = vertcat(_x_0, fake_padding) + + if len_x_0 > len_x_1: + fake_padding = interface.ocp.cx(len_x_0 - len_x_1, 1) + _x_1 = vertcat(_x_1, fake_padding) - if interface.ocp.nlp[ - _penalty.nodes_phase[0] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[0] < len( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled + if all_nlp[ + phase_node0 + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_0 < len( + all_nlp[phase_node0].U_scaled ): if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] + all_nlp[phase_node1].U_scaled[0].shape[0] + > all_nlp[phase_node0].U_scaled[0].shape[0] ) and ( - _penalty.all_nodes_index[1] < len(interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[1]].phase_dynamics + node_idx_1 < len(all_nlp[phase_node1].U_scaled) + or all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0], + all_nlp[phase_node1].U_scaled[0].shape[0] + - all_nlp[phase_node0].U_scaled[0].shape[0], 1, ) _u_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[_penalty.all_nodes_index[0] - u0_mode], + all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode], fake, ) else: - _u_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[ - _penalty.all_nodes_index[0] - u0_mode + _u_0 = all_nlp[phase_node0].U_scaled[ + node_idx_0 - u0_mode ] else: _u_0 = [] - if interface.ocp.nlp[ - _penalty.nodes_phase[1] - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or _penalty.all_nodes_index[1] < len( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled + if all_nlp[ + phase_node1 + ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_1 < len( + all_nlp[phase_node1].U_scaled ): if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0] + all_nlp[phase_node0].U_scaled[0].shape[0] + > all_nlp[phase_node1].U_scaled[0].shape[0] ) and ( - _penalty.all_nodes_index[0] < len(interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled) - or interface.ocp.nlp[_penalty.nodes_phase[0]].phase_dynamics + node_idx_0 < len(all_nlp[phase_node0].U_scaled) + or all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE ): fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].U_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[0].shape[0], + all_nlp[phase_node0].U_scaled[0].shape[0] + - all_nlp[phase_node1].U_scaled[0].shape[0], 1, ) _u_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[_penalty.all_nodes_index[1] - u1_mode], + all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode], fake, ) else: - _u_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].U_scaled[ - _penalty.all_nodes_index[1] - u1_mode - ] + _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] else: _u_1 = [] - if ( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0], - 1, - ) - _s_0 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]], - fake, - ) - else: - _s_0 = interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[_penalty.all_nodes_index[0]] - if ( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - > interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0] - ): - fake = interface.ocp.cx( - interface.ocp.nlp[_penalty.nodes_phase[0]].S_scaled[0].shape[0] - - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[0].shape[0], - 1, - ) - _s_1 = vertcat( - interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]], - fake, - ) - else: - _s_1 = interface.ocp.nlp[_penalty.nodes_phase[1]].S_scaled[_penalty.all_nodes_index[1]] + _s_0 = all_nlp[phase_node0].S_scaled[node_idx_0] + _s_1 = all_nlp[phase_node1].S_scaled[node_idx_1] + + len_s_0 = _s_0.shape[0] + len_s_1 = _s_1.shape[0] + + if len_s_1 > len_s_0: + fake_padding = interface.ocp.cx(len_s_1 - len_s_0, 1) + _s_0 = vertcat(_s_0, fake_padding) + + if len_s_0 > len_s_1: + fake_padding = interface.ocp.cx(len_s_0 - len_s_1, 1) + _s_1 = vertcat(_s_1, fake_padding) _x = vertcat(_x_1, _x_0) _u = vertcat(_u_1, _u_0) @@ -748,4 +665,3 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): ) _u = vertcat(_u, u) return _x, _u, _s - From baea3a999f3da91884eb5f7e305d338d4547955c Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 20:01:04 -0400 Subject: [PATCH 05/17] ok --- bioptim/interfaces/interface_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 4726b6665..d4f1fa286 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -379,6 +379,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): if all_nlp[ phase_node0 ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_0 < len( + if all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_0 < len( all_nlp[phase_node0].U_scaled ): if ( From 4ce1fec24e17358f7df144af115eaafdc520e4c3 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 20:39:36 -0400 Subject: [PATCH 06/17] final clean and shrink for tonight --- bioptim/interfaces/interface_utils.py | 84 +++++++++------------------ 1 file changed, 29 insertions(+), 55 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index d4f1fa286..cf5be8c08 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -376,61 +376,35 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): fake_padding = interface.ocp.cx(len_x_0 - len_x_1, 1) _x_1 = vertcat(_x_1, fake_padding) - if all_nlp[ - phase_node0 - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_0 < len( - if all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_0 < len( - all_nlp[phase_node0].U_scaled - ): - if ( - all_nlp[phase_node1].U_scaled[0].shape[0] - > all_nlp[phase_node0].U_scaled[0].shape[0] - ) and ( - node_idx_1 < len(all_nlp[phase_node1].U_scaled) - or all_nlp[phase_node1].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - all_nlp[phase_node1].U_scaled[0].shape[0] - - all_nlp[phase_node0].U_scaled[0].shape[0], - 1, - ) - _u_0 = vertcat( - all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode], - fake, - ) - else: - _u_0 = all_nlp[phase_node0].U_scaled[ - node_idx_0 - u0_mode - ] - else: - _u_0 = [] - if all_nlp[ - phase_node1 - ].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or node_idx_1 < len( - all_nlp[phase_node1].U_scaled - ): - if ( - all_nlp[phase_node0].U_scaled[0].shape[0] - > all_nlp[phase_node1].U_scaled[0].shape[0] - ) and ( - node_idx_0 < len(all_nlp[phase_node0].U_scaled) - or all_nlp[phase_node0].phase_dynamics - == PhaseDynamics.SHARED_DURING_THE_PHASE - ): - fake = interface.ocp.cx( - all_nlp[phase_node0].U_scaled[0].shape[0] - - all_nlp[phase_node1].U_scaled[0].shape[0], - 1, - ) - _u_1 = vertcat( - all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode], - fake, - ) - else: - _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] - else: - _u_1 = [] + is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U_scaled) + is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U_scaled) + + _u_0 = [] + _u_1 = [] + + if is_shared_dynamics_0 or is_node0_within_control_limit: + _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] + _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] + len_u_0 = _u_0.shape[0] + len_u_1 = _u_1.shape[0] + should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( + is_node1_within_control_limit or is_shared_dynamics_1) + if should_apply_fake_padding_on_u0: + fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) + _u_0 = vertcat(_u_0, fake_padding) + + if is_shared_dynamics_1 or is_node1_within_control_limit: + _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] + _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] + len_u_0 = _u_0.shape[0] + len_u_1 = _u_1.shape[0] + should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( + is_node0_within_control_limit or is_shared_dynamics_0) + if should_apply_fake_padding_on_u1: + fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) + _u_1 = vertcat(_u_1, fake_padding) _s_0 = all_nlp[phase_node0].S_scaled[node_idx_0] _s_1 = all_nlp[phase_node1].S_scaled[node_idx_1] From b936402cf35d3bf83f92d2bc5051f2334a40cbb3 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sat, 4 Nov 2023 20:40:31 -0400 Subject: [PATCH 07/17] black --- bioptim/interfaces/interface_utils.py | 52 ++++++++++++++------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index cf5be8c08..4e33c5d62 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -270,7 +270,6 @@ def format_target(penalty, target_in: np.ndarray, idx: int) -> np.ndarray: def get_control_modificator(ocp, _penalty, index: int): - current_phase = ocp.nlp[_penalty.nodes_phase[index]] current_node = _penalty.nodes[index] phase_dynamics = current_phase.phase_dynamics @@ -283,11 +282,9 @@ def get_control_modificator(ocp, _penalty, index: int): def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): - """ - """ + """ """ if _penalty.transition: - ocp = interface.ocp u0_mode = get_control_modificator(ocp, _penalty, 0) @@ -328,7 +325,9 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): len_u_1 = u_1.shape[0] if is_shared_dynamics_0 or is_node0_within_control_limit: - should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and (is_node1_within_control_limit or is_shared_dynamics_1) + should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( + is_node1_within_control_limit or is_shared_dynamics_1 + ) if should_apply_fake_padding_on_u0: fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) _u_0 = vertcat(u_0, fake_padding) @@ -338,7 +337,9 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): _u_0 = [] if is_shared_dynamics_1 or is_node1_within_control_limit: - should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and (is_node0_within_control_limit or is_shared_dynamics_0) + should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( + is_node0_within_control_limit or is_shared_dynamics_0 + ) if should_apply_fake_padding_on_u1: fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(u_1, fake_padding) @@ -390,7 +391,8 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): len_u_0 = _u_0.shape[0] len_u_1 = _u_1.shape[0] should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( - is_node1_within_control_limit or is_shared_dynamics_1) + is_node1_within_control_limit or is_shared_dynamics_1 + ) if should_apply_fake_padding_on_u0: fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) _u_0 = vertcat(_u_0, fake_padding) @@ -401,7 +403,8 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): len_u_0 = _u_0.shape[0] len_u_1 = _u_1.shape[0] should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( - is_node0_within_control_limit or is_shared_dynamics_0) + is_node0_within_control_limit or is_shared_dynamics_0 + ) if should_apply_fake_padding_on_u1: fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(_u_1, fake_padding) @@ -434,7 +437,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): for i in range(len(_penalty.nodes_phase)): nlp_i = ocp.nlp[_penalty.nodes_phase[i]] index_i = _penalty.multinode_idx[i] - ui_mode = get_control_modificator(ocp, _penalty=_penalty,index=i) + ui_mode = get_control_modificator(ocp, _penalty=_penalty, index=i) if is_unscaled: _x_tp = nlp_i.cx() @@ -458,8 +461,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): _x_tp = vertcat(_x_tp, nlp_i.X_scaled[index_i][:, i]) _u_tp = ( nlp_i.U_scaled[index_i - ui_mode] - if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - or index_i < len(nlp_i.U_scaled) + if nlp_i.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE or index_i < len(nlp_i.U_scaled) else [] ) _s_tp = nlp_i.S_scaled[index_i] @@ -493,8 +495,8 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): if is_unscaled: _x = nlp.cx() if ( - _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL - or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL + _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL + or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL ): _x = vertcat(_x, nlp.X[_idx][:, 0]) else: @@ -503,11 +505,11 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that if ( - _idx == nlp.ns - and nlp.ode_solver.is_direct_collocation - and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and _penalty.node[0] != Node.END - and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL + _idx == nlp.ns + and nlp.ode_solver.is_direct_collocation + and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + and _penalty.node[0] != Node.END + and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL ): for i in range(1, nlp.X[_idx - 1].shape[1]): _x = vertcat(_x, nlp.X[_idx - 1][:, i]) @@ -521,8 +523,8 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): else: _x = nlp.cx() if ( - _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL - or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL + _penalty.integration_rule == QuadratureRule.APPROXIMATE_TRAPEZOIDAL + or _penalty.integration_rule == QuadratureRule.TRAPEZOIDAL ): _x = vertcat(_x, nlp.X_scaled[_idx][:, 0]) else: @@ -531,11 +533,11 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): # Watch out, this is ok for all of our current built-in functions, but it is not generally ok to do that if ( - _idx == nlp.ns - and nlp.ode_solver.is_direct_collocation - and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - and _penalty.node[0] != Node.END - and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL + _idx == nlp.ns + and nlp.ode_solver.is_direct_collocation + and nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + and _penalty.node[0] != Node.END + and _penalty.integration_rule != QuadratureRule.APPROXIMATE_TRAPEZOIDAL ): for i in range(1, nlp.X_scaled[_idx - 1].shape[1]): _x = vertcat(_x, nlp.X_scaled[_idx - 1][:, i]) From b9ef3c47e490c9e7716909a08ddee79677076c50 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 09:06:08 -0500 Subject: [PATCH 08/17] fixing the bug --- bioptim/interfaces/interface_utils.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 4e33c5d62..e800ffb4a 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -382,26 +382,25 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U_scaled) + len_u_0 = all_nlp[phase_node0].U_scaled[0].shape[0] + len_u_1 = all_nlp[phase_node1].U_scaled[0].shape[0] + _u_0 = [] _u_1 = [] if is_shared_dynamics_0 or is_node0_within_control_limit: - _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] - _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] - len_u_0 = _u_0.shape[0] - len_u_1 = _u_1.shape[0] should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( is_node1_within_control_limit or is_shared_dynamics_1 ) + _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] + if should_apply_fake_padding_on_u0: fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) _u_0 = vertcat(_u_0, fake_padding) if is_shared_dynamics_1 or is_node1_within_control_limit: - _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] - len_u_0 = _u_0.shape[0] - len_u_1 = _u_1.shape[0] + should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( is_node0_within_control_limit or is_shared_dynamics_0 ) From 84ef66c76756d906b36d8dd7f16fc8da34129ae4 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 09:26:06 -0500 Subject: [PATCH 09/17] harmonizing unscaled and scaled --- bioptim/interfaces/interface_utils.py | 61 ++++++++++++++++++++------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index e800ffb4a..0e47bf2c7 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -318,35 +318,31 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U) - u_0 = all_nlp[phase_node0].U[node_idx_0 - u0_mode] - u_1 = all_nlp[phase_node0].U[node_idx_1 - u1_mode] + len_u_0 = all_nlp[phase_node0].U[0].shape[0] + len_u_1 = all_nlp[phase_node1].U[0].shape[0] - len_u_0 = u_0.shape[0] - len_u_1 = u_1.shape[0] + _u_0 = [] + _u_1 = [] if is_shared_dynamics_0 or is_node0_within_control_limit: should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( - is_node1_within_control_limit or is_shared_dynamics_1 + is_node1_within_control_limit or is_shared_dynamics_1 ) + _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] + if should_apply_fake_padding_on_u0: fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) - _u_0 = vertcat(u_0, fake_padding) - else: - _u_0 = u_0 - else: - _u_0 = [] + _u_0 = vertcat(_u_0, fake_padding) if is_shared_dynamics_1 or is_node1_within_control_limit: + _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] + should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( - is_node0_within_control_limit or is_shared_dynamics_0 + is_node0_within_control_limit or is_shared_dynamics_0 ) if should_apply_fake_padding_on_u1: fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) - _u_1 = vertcat(u_1, fake_padding) - else: - _u_1 = u_1 - else: - _u_1 = [] + _u_1 = vertcat(_u_1, fake_padding) _s_0 = all_nlp[phase_node0].S[node_idx_0] _s_1 = all_nlp[phase_node1].S[node_idx_1] @@ -641,3 +637,36 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): ) _u = vertcat(_u, u) return _x, _u, _s + + +def get_padded_array(nlp, attribute, node_idx, target_length, casadi_constructor) -> SX | MX: + """ + Get a padded array of the correct length + + Parameters + ---------- + nlp: NonLinearProgram + The current phase + attribute: str + The attribute to get the array from + node_idx: int + The node index + target_length: int + The target length of the array + casadi_constructor: Callable + The casadi constructor to use that either build SX or MX + + Returns + ------- + SX | MX + The padded array + """ + + padded_array = getattr(nlp, attribute)[node_idx][:, 0] + len_x = padded_array.shape[0] + + if len_x > target_length: + fake_padding = casadi_constructor(len_x - target_length, 1) + padded_array = vertcat(padded_array, fake_padding) + + return padded_array From 3638764bfcf9f94f93114d9c64e961ffe4dfd0ad Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 09:57:56 -0500 Subject: [PATCH 10/17] calling that sub function properly --- bioptim/interfaces/interface_utils.py | 115 ++++++++++++++------------ 1 file changed, 61 insertions(+), 54 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 0e47bf2c7..f84e94362 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -1,4 +1,5 @@ from time import perf_counter +from typing import Callable from sys import platform from casadi import Importer @@ -286,6 +287,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): if _penalty.transition: ocp = interface.ocp + cx = interface.ocp.cx u0_mode = get_control_modificator(ocp, _penalty, 0) u1_mode = get_control_modificator(ocp, _penalty, 1) @@ -299,19 +301,21 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): node_idx_1 = _penalty.all_nodes_index[1] if is_unscaled: - _x_0 = all_nlp[phase_node0].X[node_idx_0][:, 0] - _x_1 = all_nlp[phase_node1].X[node_idx_1][:, 0] - len_x_0 = _x_0.shape[0] - len_x_1 = _x_1.shape[0] - - if len_x_1 > len_x_0: - fake_padding = interface.ocp.cx(len_x_1 - len_x_0, 1) - _x_0 = vertcat(_x_0, fake_padding) - - if len_x_0 > len_x_1: - fake_padding = interface.ocp.cx(len_x_0 - len_x_1, 1) - _x_1 = vertcat(_x_1, fake_padding) + _x_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="X", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].X[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _x_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="X", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].X[node_idx_0].shape[0], + casadi_constructor=cx, + ) is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U) @@ -344,34 +348,36 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(_u_1, fake_padding) - _s_0 = all_nlp[phase_node0].S[node_idx_0] - _s_1 = all_nlp[phase_node1].S[node_idx_1] - - len_s_0 = _s_0.shape[0] - len_s_1 = _s_1.shape[0] - - if len_s_1 > len_s_0: - fake_padding = interface.ocp.cx(len_s_1 - len_s_0, 1) - _s_0 = vertcat(_s_0, fake_padding) - - if len_s_0 > len_s_1: - fake_padding = interface.ocp.cx(len_s_0 - len_s_1, 1) - _s_1 = vertcat(_s_1, fake_padding) + _s_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="S", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].S[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _s_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="S", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].S[node_idx_0].shape[0], + casadi_constructor=cx, + ) else: - _x_0 = all_nlp[phase_node0].X_scaled[node_idx_0][:, 0] - _x_1 = all_nlp[phase_node1].X_scaled[node_idx_1][:, 0] - - len_x_0 = _x_0.shape[0] - len_x_1 = _x_1.shape[0] - - if len_x_1 > len_x_0: - fake_padding = interface.ocp.cx(len_x_1 - len_x_0, 1) - _x_0 = vertcat(_x_0, fake_padding) - - if len_x_0 > len_x_1: - fake_padding = interface.ocp.cx(len_x_0 - len_x_1, 1) - _x_1 = vertcat(_x_1, fake_padding) + _x_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="X_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].X_scaled[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _x_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="X_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].X_scaled[node_idx_0].shape[0], + casadi_constructor=cx, + ) is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U_scaled) @@ -404,19 +410,20 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(_u_1, fake_padding) - _s_0 = all_nlp[phase_node0].S_scaled[node_idx_0] - _s_1 = all_nlp[phase_node1].S_scaled[node_idx_1] - - len_s_0 = _s_0.shape[0] - len_s_1 = _s_1.shape[0] - - if len_s_1 > len_s_0: - fake_padding = interface.ocp.cx(len_s_1 - len_s_0, 1) - _s_0 = vertcat(_s_0, fake_padding) - - if len_s_0 > len_s_1: - fake_padding = interface.ocp.cx(len_s_0 - len_s_1, 1) - _s_1 = vertcat(_s_1, fake_padding) + _s_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="S_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].S_scaled[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _s_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="S_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].S_scaled[node_idx_0].shape[0], + casadi_constructor=cx, + ) _x = vertcat(_x_1, _x_0) _u = vertcat(_u_1, _u_0) @@ -639,7 +646,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): return _x, _u, _s -def get_padded_array(nlp, attribute, node_idx, target_length, casadi_constructor) -> SX | MX: +def get_padded_array(nlp, attribute:str, node_idx:int, target_length:int, casadi_constructor: Callable) -> SX | MX: """ Get a padded array of the correct length @@ -648,9 +655,9 @@ def get_padded_array(nlp, attribute, node_idx, target_length, casadi_constructor nlp: NonLinearProgram The current phase attribute: str - The attribute to get the array from + The attribute to get the array from such as "X", "X_scaled", "U", "U_scaled", "S", "S_scaled" node_idx: int - The node index + The node index in the current phase target_length: int The target length of the array casadi_constructor: Callable From e5e7013a5ecbe0a4e84fa3bfa493531bf94db040 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 10:11:39 -0500 Subject: [PATCH 11/17] black --- bioptim/interfaces/interface_utils.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index f84e94362..095606d74 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -301,7 +301,6 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): node_idx_1 = _penalty.all_nodes_index[1] if is_unscaled: - _x_0 = get_padded_array( nlp=all_nlp[phase_node0], attribute="X", @@ -330,7 +329,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): if is_shared_dynamics_0 or is_node0_within_control_limit: should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( - is_node1_within_control_limit or is_shared_dynamics_1 + is_node1_within_control_limit or is_shared_dynamics_1 ) _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] @@ -342,7 +341,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( - is_node0_within_control_limit or is_shared_dynamics_0 + is_node0_within_control_limit or is_shared_dynamics_0 ) if should_apply_fake_padding_on_u1: fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) @@ -646,7 +645,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): return _x, _u, _s -def get_padded_array(nlp, attribute:str, node_idx:int, target_length:int, casadi_constructor: Callable) -> SX | MX: +def get_padded_array(nlp, attribute: str, node_idx: int, target_length: int, casadi_constructor: Callable) -> SX | MX: """ Get a padded array of the correct length From a3e820d47e74f9f81b8ccefa7244a797d4528034 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 15:46:00 -0500 Subject: [PATCH 12/17] adding comments and cases --- bioptim/interfaces/interface_utils.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 095606d74..31a798a80 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -645,7 +645,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): return _x, _u, _s -def get_padded_array(nlp, attribute: str, node_idx: int, target_length: int, casadi_constructor: Callable) -> SX | MX: +def get_padded_array(nlp, attribute: str, node_idx: int, casadi_constructor: Callable, target_length: int = None) -> SX | MX: """ Get a padded array of the correct length @@ -658,7 +658,8 @@ def get_padded_array(nlp, attribute: str, node_idx: int, target_length: int, cas node_idx: int The node index in the current phase target_length: int - The target length of the array + The target length of the array, in some cases, one side can be longer than the other one + (e.g. when using uneven transition phase with a different of states between the two phases) casadi_constructor: Callable The casadi constructor to use that either build SX or MX @@ -667,12 +668,14 @@ def get_padded_array(nlp, attribute: str, node_idx: int, target_length: int, cas SX | MX The padded array """ - padded_array = getattr(nlp, attribute)[node_idx][:, 0] len_x = padded_array.shape[0] - if len_x > target_length: - fake_padding = casadi_constructor(len_x - target_length, 1) + if target_length is None: + target_length = len_x + + if target_length > len_x: + fake_padding = casadi_constructor(target_length - len_x, 1) padded_array = vertcat(padded_array, fake_padding) return padded_array From 3f83dbaa8fd7048f8b4b19166e9b754c3511ef0a Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 16:06:35 -0500 Subject: [PATCH 13/17] get that unit test --- tests/shard6/test_unit_solver_interface.py | 70 ++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tests/shard6/test_unit_solver_interface.py diff --git a/tests/shard6/test_unit_solver_interface.py b/tests/shard6/test_unit_solver_interface.py new file mode 100644 index 000000000..0a87d355c --- /dev/null +++ b/tests/shard6/test_unit_solver_interface.py @@ -0,0 +1,70 @@ +import pytest +from casadi import SX, MX, vertcat, Function +import numpy as np + +from bioptim import NonLinearProgram +from bioptim.interfaces.interface_utils import get_padded_array + + +@pytest.fixture +def nlp_sx(): + # Create a dummy NonLinearProgram object with necessary attributes + nlp = NonLinearProgram(None) + nlp.X = [SX([[1], [2], [3]])] + nlp.X_scaled = [SX([[4], [5], [6]])] + # Add more attributes as needed + return nlp + + +@pytest.fixture +def nlp_mx(): + # Create a dummy NonLinearProgram object with necessary attributes + nlp = NonLinearProgram(None) + nlp.X = [MX(np.array([[1], [2], [3]]))] + nlp.X_scaled = [MX(np.array([[4], [5], [6]]))] + # Add more attributes as needed + return nlp + + +def test_valid_input(nlp_sx): + result = get_padded_array(nlp_sx, 'X', 0, SX, 5) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (result - expected).is_zero() + + +def test_no_padding(nlp_sx): + result = get_padded_array(nlp_sx, 'X', 0, SX) + expected = SX([[1], [2], [3]]) + assert (result - expected).is_zero() + + +def test_custom_target_length(nlp_sx): + result = get_padded_array(nlp_sx, 'X', 0, SX, 4) + expected = vertcat(SX([[1], [2], [3]]), SX(1, 1)) + assert (result - expected).is_zero() + + +def test_invalid_attribute(nlp_sx): + with pytest.raises(AttributeError): + get_padded_array(nlp_sx, 'invalid_attribute', 0, SX) + + +def test_invalid_node_idx(nlp_sx): + with pytest.raises(IndexError): + get_padded_array(nlp_sx, 'X', 10, SX) + + +def test_sx(nlp_sx): + result_sx = get_padded_array(nlp_sx, 'X', 0, SX, 5) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (result_sx - expected).is_zero() + + +def test_mx(nlp_mx): + result_mx = get_padded_array(nlp_mx, 'X', 0, MX, 5) + expected = vertcat( + MX(np.array([[1], [2], [3]])), + MX(2,1) + ) + res = Function('test', [], [result_mx-expected]) + assert res()["o0"].is_zero() From 9140def0e63eab57e7ff2bf5aef00376767b4372 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 16:07:53 -0500 Subject: [PATCH 14/17] factoring by deleting lines --- bioptim/interfaces/interface_utils.py | 88 +++++++++------------------ 1 file changed, 30 insertions(+), 58 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 31a798a80..603f877bd 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -300,21 +300,37 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): node_idx_0 = _penalty.all_nodes_index[0] node_idx_1 = _penalty.all_nodes_index[1] + _x_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="X" if is_unscaled else "X_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].X_scaled[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _x_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="X" if is_unscaled else "X_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].X_scaled[node_idx_0].shape[0], + casadi_constructor=cx, + ) + + _s_0 = get_padded_array( + nlp=all_nlp[phase_node0], + attribute="S" if is_unscaled else "S_scaled", + node_idx=node_idx_0, + target_length=all_nlp[phase_node1].S[node_idx_1].shape[0], + casadi_constructor=cx, + ) + _s_1 = get_padded_array( + nlp=all_nlp[phase_node1], + attribute="S" if is_unscaled else "S_scaled", + node_idx=node_idx_1, + target_length=all_nlp[phase_node0].S[node_idx_0].shape[0], + casadi_constructor=cx, + ) + if is_unscaled: - _x_0 = get_padded_array( - nlp=all_nlp[phase_node0], - attribute="X", - node_idx=node_idx_0, - target_length=all_nlp[phase_node1].X[node_idx_1].shape[0], - casadi_constructor=cx, - ) - _x_1 = get_padded_array( - nlp=all_nlp[phase_node1], - attribute="X", - node_idx=node_idx_1, - target_length=all_nlp[phase_node0].X[node_idx_0].shape[0], - casadi_constructor=cx, - ) is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U) @@ -347,36 +363,7 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(_u_1, fake_padding) - _s_0 = get_padded_array( - nlp=all_nlp[phase_node0], - attribute="S", - node_idx=node_idx_0, - target_length=all_nlp[phase_node1].S[node_idx_1].shape[0], - casadi_constructor=cx, - ) - _s_1 = get_padded_array( - nlp=all_nlp[phase_node1], - attribute="S", - node_idx=node_idx_1, - target_length=all_nlp[phase_node0].S[node_idx_0].shape[0], - casadi_constructor=cx, - ) - else: - _x_0 = get_padded_array( - nlp=all_nlp[phase_node0], - attribute="X_scaled", - node_idx=node_idx_0, - target_length=all_nlp[phase_node1].X_scaled[node_idx_1].shape[0], - casadi_constructor=cx, - ) - _x_1 = get_padded_array( - nlp=all_nlp[phase_node1], - attribute="X_scaled", - node_idx=node_idx_1, - target_length=all_nlp[phase_node0].X_scaled[node_idx_0].shape[0], - casadi_constructor=cx, - ) is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U_scaled) @@ -409,21 +396,6 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) _u_1 = vertcat(_u_1, fake_padding) - _s_0 = get_padded_array( - nlp=all_nlp[phase_node0], - attribute="S_scaled", - node_idx=node_idx_0, - target_length=all_nlp[phase_node1].S_scaled[node_idx_1].shape[0], - casadi_constructor=cx, - ) - _s_1 = get_padded_array( - nlp=all_nlp[phase_node1], - attribute="S_scaled", - node_idx=node_idx_1, - target_length=all_nlp[phase_node0].S_scaled[node_idx_0].shape[0], - casadi_constructor=cx, - ) - _x = vertcat(_x_1, _x_0) _u = vertcat(_u_1, _u_0) _s = vertcat(_s_1, _s_0) From ab0e2dbac85db33b3f9e244855b72584aa9a6cc5 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 16:08:18 -0500 Subject: [PATCH 15/17] blck --- bioptim/interfaces/interface_utils.py | 6 +++--- tests/shard6/test_unit_solver_interface.py | 21 +++++++++------------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 603f877bd..c793508f9 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -331,7 +331,6 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): ) if is_unscaled: - is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U) is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE @@ -364,7 +363,6 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): _u_1 = vertcat(_u_1, fake_padding) else: - is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U_scaled) is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE @@ -617,7 +615,9 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): return _x, _u, _s -def get_padded_array(nlp, attribute: str, node_idx: int, casadi_constructor: Callable, target_length: int = None) -> SX | MX: +def get_padded_array( + nlp, attribute: str, node_idx: int, casadi_constructor: Callable, target_length: int = None +) -> SX | MX: """ Get a padded array of the correct length diff --git a/tests/shard6/test_unit_solver_interface.py b/tests/shard6/test_unit_solver_interface.py index 0a87d355c..4fe26ca66 100644 --- a/tests/shard6/test_unit_solver_interface.py +++ b/tests/shard6/test_unit_solver_interface.py @@ -27,44 +27,41 @@ def nlp_mx(): def test_valid_input(nlp_sx): - result = get_padded_array(nlp_sx, 'X', 0, SX, 5) + result = get_padded_array(nlp_sx, "X", 0, SX, 5) expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) assert (result - expected).is_zero() def test_no_padding(nlp_sx): - result = get_padded_array(nlp_sx, 'X', 0, SX) + result = get_padded_array(nlp_sx, "X", 0, SX) expected = SX([[1], [2], [3]]) assert (result - expected).is_zero() def test_custom_target_length(nlp_sx): - result = get_padded_array(nlp_sx, 'X', 0, SX, 4) + result = get_padded_array(nlp_sx, "X", 0, SX, 4) expected = vertcat(SX([[1], [2], [3]]), SX(1, 1)) assert (result - expected).is_zero() def test_invalid_attribute(nlp_sx): with pytest.raises(AttributeError): - get_padded_array(nlp_sx, 'invalid_attribute', 0, SX) + get_padded_array(nlp_sx, "invalid_attribute", 0, SX) def test_invalid_node_idx(nlp_sx): with pytest.raises(IndexError): - get_padded_array(nlp_sx, 'X', 10, SX) + get_padded_array(nlp_sx, "X", 10, SX) def test_sx(nlp_sx): - result_sx = get_padded_array(nlp_sx, 'X', 0, SX, 5) + result_sx = get_padded_array(nlp_sx, "X", 0, SX, 5) expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) assert (result_sx - expected).is_zero() def test_mx(nlp_mx): - result_mx = get_padded_array(nlp_mx, 'X', 0, MX, 5) - expected = vertcat( - MX(np.array([[1], [2], [3]])), - MX(2,1) - ) - res = Function('test', [], [result_mx-expected]) + result_mx = get_padded_array(nlp_mx, "X", 0, MX, 5) + expected = vertcat(MX(np.array([[1], [2], [3]])), MX(2, 1)) + res = Function("test", [], [result_mx - expected]) assert res()["o0"].is_zero() From fb901df2e8d7cf266966b8229c1b785cda9f10d0 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 17:11:12 -0500 Subject: [PATCH 16/17] All good and stop here :) --- bioptim/interfaces/interface_utils.py | 160 ++++++++++++--------- tests/shard6/test_unit_solver_interface.py | 74 +++++++++- 2 files changed, 168 insertions(+), 66 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index c793508f9..2f1cdf103 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -289,9 +289,6 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): ocp = interface.ocp cx = interface.ocp.cx - u0_mode = get_control_modificator(ocp, _penalty, 0) - u1_mode = get_control_modificator(ocp, _penalty, 1) - all_nlp = interface.ocp.nlp phase_node0 = _penalty.nodes_phase[0] @@ -300,6 +297,9 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): node_idx_0 = _penalty.all_nodes_index[0] node_idx_1 = _penalty.all_nodes_index[1] + u0_mode = get_control_modificator(ocp, _penalty, 0) + u1_mode = get_control_modificator(ocp, _penalty, 1) + _x_0 = get_padded_array( nlp=all_nlp[phase_node0], attribute="X" if is_unscaled else "X_scaled", @@ -330,69 +330,34 @@ def get_x_u_s_at_idx(interface, nlp, _penalty, _idx, is_unscaled): casadi_constructor=cx, ) - if is_unscaled: - is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U) - is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U) - - len_u_0 = all_nlp[phase_node0].U[0].shape[0] - len_u_1 = all_nlp[phase_node1].U[0].shape[0] - - _u_0 = [] - _u_1 = [] - - if is_shared_dynamics_0 or is_node0_within_control_limit: - should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( - is_node1_within_control_limit or is_shared_dynamics_1 - ) - _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] - - if should_apply_fake_padding_on_u0: - fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) - _u_0 = vertcat(_u_0, fake_padding) - - if is_shared_dynamics_1 or is_node1_within_control_limit: - _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] - - should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( - is_node0_within_control_limit or is_shared_dynamics_0 - ) - if should_apply_fake_padding_on_u1: - fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) - _u_1 = vertcat(_u_1, fake_padding) - - else: - is_shared_dynamics_0 = all_nlp[phase_node0].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - is_node0_within_control_limit = node_idx_0 < len(all_nlp[phase_node0].U_scaled) - is_shared_dynamics_1 = all_nlp[phase_node1].phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE - is_node1_within_control_limit = node_idx_1 < len(all_nlp[phase_node1].U_scaled) - - len_u_0 = all_nlp[phase_node0].U_scaled[0].shape[0] - len_u_1 = all_nlp[phase_node1].U_scaled[0].shape[0] - - _u_0 = [] - _u_1 = [] - - if is_shared_dynamics_0 or is_node0_within_control_limit: - should_apply_fake_padding_on_u0 = len_u_1 > len_u_0 and ( - is_node1_within_control_limit or is_shared_dynamics_1 - ) - _u_0 = all_nlp[phase_node0].U_scaled[node_idx_0 - u0_mode] - - if should_apply_fake_padding_on_u0: - fake_padding = interface.ocp.cx(len_u_1 - len_u_0, 1) - _u_0 = vertcat(_u_0, fake_padding) + is_shared_dynamics_0, is_node0_within_control_limit, len_u_0 = get_node_control_info( + all_nlp[phase_node0], node_idx_0, attribute="U" if is_unscaled else "U_scaled" + ) + is_shared_dynamics_1, is_node1_within_control_limit, len_u_1 = get_node_control_info( + all_nlp[phase_node1], node_idx_1, attribute="U" if is_unscaled else "U_scaled" + ) - if is_shared_dynamics_1 or is_node1_within_control_limit: - _u_1 = all_nlp[phase_node1].U_scaled[node_idx_1 - u1_mode] + _u_0 = get_padded_control_array( + all_nlp[phase_node0], + node_idx_0, + attribute="U" if is_unscaled else "U_scaled", + u_mode=u0_mode, + target_length=len_u_1, + is_shared_dynamics_target=is_shared_dynamics_1, + is_within_control_limit_target=is_node1_within_control_limit, + casadi_constructor=cx, + ) - should_apply_fake_padding_on_u1 = len_u_0 > len_u_1 and ( - is_node0_within_control_limit or is_shared_dynamics_0 - ) - if should_apply_fake_padding_on_u1: - fake_padding = interface.ocp.cx(len_u_0 - len_u_1, 1) - _u_1 = vertcat(_u_1, fake_padding) + _u_1 = get_padded_control_array( + all_nlp[phase_node1], + node_idx_1, + attribute="U" if is_unscaled else "U_scaled", + u_mode=u1_mode, + target_length=len_u_0, + is_shared_dynamics_target=is_shared_dynamics_0, + is_within_control_limit_target=is_node0_within_control_limit, + casadi_constructor=cx, + ) _x = vertcat(_x_1, _x_0) _u = vertcat(_u_1, _u_0) @@ -651,3 +616,70 @@ def get_padded_array( padded_array = vertcat(padded_array, fake_padding) return padded_array + + +def get_node_control_info(nlp, node_idx, attribute: str): + """This returns the information about the control at a given node to format controls properly""" + is_shared_dynamics = nlp.phase_dynamics == PhaseDynamics.SHARED_DURING_THE_PHASE + is_within_control_limit = node_idx < len(nlp.U_scaled) + len_u = getattr(nlp, attribute)[0].shape[0] + + return is_shared_dynamics, is_within_control_limit, len_u + + +def get_padded_control_array( + nlp, + node_idx: int, + u_mode: int, + attribute: str, + target_length: int, + is_within_control_limit_target: bool, + is_shared_dynamics_target: bool, + casadi_constructor: Callable, +): + """ + Get a padded array of the correct length + + Parameters + ---------- + nlp: NonLinearProgram + The current phase + node_idx: int + The node index in the current phase + u_mode: int + The control mode see get_control_modificator + attribute: str + The attribute to get the array from such as "X", "X_scaled", "U", "U_scaled", "S", "S_scaled" + target_length: int + The target length of the array, in some cases, one side can be longer than the other one + (e.g. when using uneven transition phase with a different of states between the two phases) + is_within_control_limit_target: bool + If the target node of a given phase is within the control limit + (e.g. when using uneven transition phase with a different of states between the two phases) + is_shared_dynamics_target: bool + If the target node of a given phase is shared during the phase + (e.g. when using uneven transition phase with a different of states between the two phases) + casadi_constructor: Callable + The casadi constructor to use that either build SX or MX + + Returns + ------- + SX | MX + The padded array + """ + + is_shared_dynamics, is_within_control_limit, len_u = get_node_control_info(nlp, node_idx, attribute=attribute) + + _u_sym = [] + + if is_shared_dynamics or is_within_control_limit: + should_apply_fake_padding_on_u_sym = target_length > len_u and ( + is_within_control_limit_target or is_shared_dynamics_target + ) + _u_sym = getattr(nlp, attribute)[node_idx - u_mode] + + if should_apply_fake_padding_on_u_sym: + fake_padding = casadi_constructor(target_length - len_u, 1) + _u_sym = vertcat(_u_sym, fake_padding) + + return _u_sym diff --git a/tests/shard6/test_unit_solver_interface.py b/tests/shard6/test_unit_solver_interface.py index 4fe26ca66..a75b9dcdf 100644 --- a/tests/shard6/test_unit_solver_interface.py +++ b/tests/shard6/test_unit_solver_interface.py @@ -2,8 +2,8 @@ from casadi import SX, MX, vertcat, Function import numpy as np -from bioptim import NonLinearProgram -from bioptim.interfaces.interface_utils import get_padded_array +from bioptim import NonLinearProgram, PhaseDynamics +from bioptim.interfaces.interface_utils import get_padded_array, get_node_control_info, get_padded_control_array @pytest.fixture @@ -65,3 +65,73 @@ def test_mx(nlp_mx): expected = vertcat(MX(np.array([[1], [2], [3]])), MX(2, 1)) res = Function("test", [], [result_mx - expected]) assert res()["o0"].is_zero() + +@pytest.fixture +def nlp_control_sx(): + nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE) + nlp.U_scaled = [SX([[1], [2], [3]])] + return nlp + +@pytest.fixture +def nlp_control_mx(): + nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE) + nlp.U_scaled = [MX(np.array([[1], [2], [3]]))] + return nlp + + +def test_get_node_control_info(nlp_control_sx): + is_shared_dynamics, is_within_control_limit, len_u = get_node_control_info(nlp_control_sx, 0, "U_scaled") + assert is_shared_dynamics + assert is_within_control_limit + assert len_u == 3 + + +def test_get_padded_control_array_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, True, True, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_mx(nlp_control_mx): + _u_sym = get_padded_control_array(nlp_control_mx, 0, 0, "U_scaled", 5, True, True, MX) + expected = vertcat(MX(np.array([[1], [2], [3]])), MX(2, 1)) + res = Function("test", [], [_u_sym - expected]) + assert res()["o0"].is_zero() + + +def test_get_node_control_info_not_shared(nlp_control_sx): + nlp_control_sx.phase_dynamics = PhaseDynamics.ONE_PER_NODE + is_shared_dynamics, _, _ = get_node_control_info(nlp_control_sx, 0, "U_scaled") + assert not is_shared_dynamics + + +def test_get_node_control_info_outside_limit(nlp_control_sx): + is_shared_dynamics, is_within_control_limit, _ = get_node_control_info(nlp_control_sx, 10, "U_scaled") + assert not is_within_control_limit + + +def test_get_padded_control_array_no_padding_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 3, True, True, SX) + expected = SX([[1], [2], [3]]) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_different_u_mode_sx(nlp_control_sx): + _u_sym = get_padded_control_array(nlp_control_sx, 1, 1, "U_scaled", 5, True, True, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_no_shared_dynamics_sx(nlp_control_sx): + nlp_control_sx.phase_dynamics = PhaseDynamics.ONE_PER_NODE + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, True, False, SX) + expected = vertcat(SX([[1], [2], [3]]), SX(2, 1)) + assert (_u_sym - expected).is_zero() + + +def test_get_padded_control_array_outside_control_limit_sx(nlp_control_sx): + with pytest.raises(IndexError): + get_padded_control_array(nlp_control_sx, 10, 0, "U_scaled", 5, False, True, SX) + + _u_sym = get_padded_control_array(nlp_control_sx, 0, 0, "U_scaled", 5, False, False, SX) + assert (_u_sym - SX([[1], [2], [3]])).is_zero() From 9a75586124ccdf2cacc4bfd8d58063092bb75726 Mon Sep 17 00:00:00 2001 From: Ipuch Date: Sun, 5 Nov 2023 17:13:00 -0500 Subject: [PATCH 17/17] blck --- bioptim/interfaces/interface_utils.py | 18 +++++++++--------- tests/shard6/test_unit_solver_interface.py | 2 ++ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/bioptim/interfaces/interface_utils.py b/bioptim/interfaces/interface_utils.py index 2f1cdf103..95ca5aecc 100644 --- a/bioptim/interfaces/interface_utils.py +++ b/bioptim/interfaces/interface_utils.py @@ -628,14 +628,14 @@ def get_node_control_info(nlp, node_idx, attribute: str): def get_padded_control_array( - nlp, - node_idx: int, - u_mode: int, - attribute: str, - target_length: int, - is_within_control_limit_target: bool, - is_shared_dynamics_target: bool, - casadi_constructor: Callable, + nlp, + node_idx: int, + u_mode: int, + attribute: str, + target_length: int, + is_within_control_limit_target: bool, + is_shared_dynamics_target: bool, + casadi_constructor: Callable, ): """ Get a padded array of the correct length @@ -674,7 +674,7 @@ def get_padded_control_array( if is_shared_dynamics or is_within_control_limit: should_apply_fake_padding_on_u_sym = target_length > len_u and ( - is_within_control_limit_target or is_shared_dynamics_target + is_within_control_limit_target or is_shared_dynamics_target ) _u_sym = getattr(nlp, attribute)[node_idx - u_mode] diff --git a/tests/shard6/test_unit_solver_interface.py b/tests/shard6/test_unit_solver_interface.py index a75b9dcdf..5656668fd 100644 --- a/tests/shard6/test_unit_solver_interface.py +++ b/tests/shard6/test_unit_solver_interface.py @@ -66,12 +66,14 @@ def test_mx(nlp_mx): res = Function("test", [], [result_mx - expected]) assert res()["o0"].is_zero() + @pytest.fixture def nlp_control_sx(): nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE) nlp.U_scaled = [SX([[1], [2], [3]])] return nlp + @pytest.fixture def nlp_control_mx(): nlp = NonLinearProgram(PhaseDynamics.SHARED_DURING_THE_PHASE)