In [1]:
# from regelum.scenario import Scenario
from regelum.simulator import CasADi
import numpy as np
from tqdm.notebook import tqdm

import sys
sys.path.append('../')

from src.system import HydraulicSystem, HydraulicSystemStationary
from src.observer import HydraulicStationaryObserver

In [2]:
from regelum.policy import Policy
from regelum.utils import rg
from regelum.model import ModelWeightContainer
from regelum.system import ComposedSystem, System
# from regelum.optimizable.core.configs import CasadiOptimizerConfig
from numpy import ndarray
from regelum.model import Model, ModelNN, ModelQuadLin
from regelum.optimizable.core.configs import OptimizerConfig
from regelum.objective import mpc_objective
from regelum.predictor import EulerPredictor
from regelum.objective import RunningObjective
from regelum.data_buffers import DataBuffer

In [3]:
from regelum.callback import (
    ScenarioStepLogger,
    # StateTracker,
    # HistoricalDataCallback,
)
from src.callback import HistoricalDataCallback, SimulatorStepLogger

from regelum import set_ipython_env
%matplotlib inline
callbacks = [SimulatorStepLogger, ScenarioStepLogger, HistoricalDataCallback]
ScenarioStepLogger.cooldown = 0.01
callbacks = set_ipython_env(callbacks=callbacks, interactive=True)

In [4]:
from regelum.model import ModelQuadLin

# from regelum.typing import RgArray
# from regelum.objective import Objective

# class HydraulicObjective(Objective):
#     def __init__(
#         self,
#         system: HydraulicSystemModel
#     ):
#         self.l_crit = system._parameters["l_crit"]
    
#     def __call__(
#         self, 
#         observation: RgArray, 
#         action: RgArray,
#         is_save_batch_format = False,
#     ):
#         # return rg.array(
#         #     [[(observation[:,0] - self.l_crit)**2]],
#         #     prototype=observation
#         # )
        
#         result = (observation[:,0] - self.l_crit)**2
        
#         return result[None, :]
        
#         # return rg.array(
#         #     [result],
#         #     prototype=observation
#         # )

class HydraulicObjectiveModel(ModelQuadLin):
    
    def __init__(
        self,
        system: HydraulicSystemStationary,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        
        self.l_crit = system._parameters["l_crit"]
    
    def forward(self, inputs, weights=None):
        if weights is None:
            weights = self.weights
            quad_matrix = self._quad_matrix
            linear_coefs = self._linear_coefs
            # if isinstance(inputs, torch.Tensor):
            #     quad_matrix = torch.FloatTensor(quad_matrix).to(inputs.device)
            #     if linear_coefs is not None:
            #         linear_coefs = torch.FloatTensor(linear_coefs).to(inputs.device)
        else:
            quad_matrix, linear_coefs = self.get_quad_lin(weights)

        substract = rg.zeros(
            inputs.shape,
            prototype=inputs
        )
        
        substract[:,0] = self.l_crit
        
        inputs -= substract

        return ModelQuadLin.quadratic_linear_form(
            inputs,
            quad_matrix,
            linear_coefs,
        )

In [5]:
MAX_STEP = 1e-7
PREDICTION_HORIZON = 10
# PREDICTION_HORIZON = 100
FINAL_TIME = 10e-3
SAMPLING_TIME = 1e-3
ACTION_SIZE = 1

pred_step_size = 1 * SAMPLING_TIME

MAX_ITER = 5000

import casadi

class CasadiOptimizerConfig(OptimizerConfig):
    """Config for casadi-based optimizers."""

    def __init__(
        self,
        batch_size=1,
    ) -> None:
        """Instantiate CasadiOptimizerConfig object.

        Args:
            batch_size (int, optional): How many latest samples to use
                from `DataBuffer`, defaults to 1
        """
        super().__init__(
            kind="symbolic",
            opt_method="ipopt",
            opt_options={"print_level": 0, "max_iter": MAX_ITER},
            log_options={"print_in": False, "print_out": False, "print_time": False},
            config_options={
                "data_buffer_sampling_method": "sample_last",
                "data_buffer_sampling_kwargs": {
                    "n_samples": batch_size,
                    "dtype": casadi.DM,
                },
            },
        )


# Define the initial state (initial position of the kinematic point).
p_atm = 1e5
initial_state = rg.array([1e3, 0, 0, p_atm, p_atm])

model = ModelWeightContainer(
    dim_output=ACTION_SIZE, 
    weights_init=rg.zeros((PREDICTION_HORIZON + 1, ACTION_SIZE), rc_type=rg.CASADI)
)  # A trivial model contains our predicted actions

system = HydraulicSystem(
    init_state=initial_state
)
system_model = HydraulicSystemStationary(
    init_state=initial_state
)
observer = HydraulicStationaryObserver(
    system=system_model
)
predictor = EulerPredictor(
    system=system_model, pred_step_size=pred_step_size
)  # Predictor is used for MPC-prediction

# RUNNING OBJECTIVE
running_objective_model = HydraulicObjectiveModel(
    system=system_model,
    quad_matrix_type="diagonal", weights=[1, 0, 0]
)  # A quadratic model for running (a.k.a. instant) objective. We 
running_objective = RunningObjective(
    model=running_objective_model
)  # Wrap `running_objective_model` into `RunningObjective` class

class MPC(Policy):

    def __init__(
        self,
        model: Model | ModelNN = model,
        system: System | ComposedSystem = system_model,
        action_bounds: list | ndarray | None = system_model.action_bounds,
        optimizer_config: OptimizerConfig | None = CasadiOptimizerConfig(),
        discount_factor: float | None = 1,
        epsilon_random_parameter: float | None = None,
        prediction_horizon: int = PREDICTION_HORIZON,
        running_objective: RunningObjective = running_objective,
        predictor: EulerPredictor = predictor,
    ):
        """Instantiate MPC policy."""
        super().__init__(
            model,
            system,
            action_bounds,
            optimizer_config,
            discount_factor,
            epsilon_random_parameter,
        )  # Initialize the parent `Policy` class.

        self.prediction_horizon = prediction_horizon
        self.running_objective = running_objective
        self.predictor = predictor
        ###### Define the optimization problem
        self.observation_var = self.create_variable(
            1,  # dimensionality of axis 0
            self.system.dim_observation,  # dimensionality of axis 1
            name="observation",
            is_constant=True,  # is_constant set to `True` as `observation` is a constant parameter of optimization
        )
        self.est_state_var = self.create_variable(
            1,  # dimensionality of axis 0
            self.system.dim_state,  # dimensionality of axis 1
            name="estimated_state",
            is_constant=True,  # is_constant is set to `True` as `estimated_state` is a constant parameter of optimization
        )
        self.policy_model_weights_var = self.create_variable(
            name="policy_model_weights",
            is_constant=False,  # is_constant is set to False because policy_model_weights is a decision variable in our optimization problem
            like=self.model.named_parameters,  # like parameter utilizes the dimensions of the model's weights for compatibility
        )
        ## Let us register bounds for policy model weights to be within action bounds
        (
            self.action_bounds_tiled,
            self.action_initial_guess,
            self.action_min,
            self.action_max,
        ) = self.handle_bounds(
            self.action_bounds,
            self.dim_action,
            tile_parameter=self.model.weights.shape[0],
        )
        self.register_bounds(self.policy_model_weights_var, self.action_bounds_tiled)

        ## Make `Optimizable` aware of objective function and variables it depends on
        self.register_objective(
            self.cost,
            variables=[
                self.observation_var,
                self.est_state_var,
                self.policy_model_weights_var,
            ],
        )

    def optimize(self, databuffer: DataBuffer) -> None:
        """Define optimization routine for `Optimizable` class."""
        new_weights = super().optimize_symbolic(
            **databuffer.get_optimization_kwargs(
                keys=["observation", "estimated_state"],
                optimizer_config=self.optimizer_config,
            ),
            policy_model_weights=self.policy_model_weights_var(),
        )[
            "policy_model_weights"
        ]  # Get the optimized weights from `Optimizable` class
        self.model.update_and_cache_weights(new_weights)

    def cost(self, observation, estimated_state, policy_model_weights):
        """Cost function for MPC Policy."""
        return mpc_objective(
            observation=observation,
            estimated_state=estimated_state,
            policy_model_weights=policy_model_weights,
            discount_factor=self.discount_factor,
            running_objective=self.running_objective,
            prediction_horizon=self.prediction_horizon,
            predictor=self.predictor,
            model=self.model,
        )  # Call `mpc_objective` function to get the cost of current state and sequence of predicted actions

In [6]:
from regelum.critic import CriticTrivial
from regelum.scenario import RLScenario
# from src.scenario_temp import RLScenario
from regelum.event import Event


simulator = CasADi(
    system=system,
    state_init=initial_state,
    action_init=rg.array([0]),
    time_final=FINAL_TIME,
    max_step=MAX_STEP,
    # first_step=1e-6,
    # atol=1e-5,
    # rtol=1e-3,
)

scenario = RLScenario(
    policy=MPC(),
    observer = observer,
    critic=CriticTrivial(),  # mocked critic
    running_objective=running_objective,
    policy_optimization_event=Event.compute_action,  # run policy optimization on each step
    discount_factor=1.0,
    sampling_time=SAMPLING_TIME,
    simulator=simulator,
    N_episodes=1,
    N_iterations=1,
)

In [7]:
# the output of the cell is omitted
scenario.run()


******************************************************************************
This program contains Ipopt, a library for large-scale nonlinear optimization.
 Ipopt is released as open source code under the Eclipse Public License (EPL).
         For more information visit https://github.com/coin-or/Ipopt
******************************************************************************

