# Model
This example is from Galvanin et al. 2007, *Model-Based Design of Parallel Experiments*.

$$
\begin{align}
    \frac{dx_1}{dt} &= (r - u_1 - \theta_4)x_1 \\
    \frac{dx_2}{dt} &= - \frac{rx_1}{\theta_3} + u_1(u_2 - x_2)\\
    r &= \frac{\theta_1 x_2}{\theta_2 + x_2}
\end{align}
$$

Where,  <br>
$x_1$ = biomass concentration ($g/L$). <br>
$x_2$ = substrate concentration ($g/L$).  <br>
$u_1$ = dilution factor ($h^{-1}$).  <br>
$u_2$ = substrate concentration in the feed ($g/L$).  <br>

**Experimental conditions** <br>
$x_1^0$ = initial biomass concentration ($1-10\; g/L$). <br>
$u_1 (t)$ = dilution factor ($0.05 - 0.20\; h^{-1}$).  <br>
$u_2 (t)$ = substrate concentration in the feed ($5-35\; g/L$).  <br>


**Measurements** <br>
$x_1 (t)$ = biomass concentration ($g/L$) <br>
$x_2 (t)$ = substrate concentration ($g/L$)  <br>

**Parameters** <br>
$\theta_1 (h^{-1}), \theta_2 (g/L), \theta_3 (\text{dimensionless}), \text{ \& } \theta_4 (h^{-1})$ <br>

**Other information** <br>
$x_2^0$ = initial substrate concentration ($0\; g/L$), cannot be manipulated. <br>
Total duration of the experiment, $\tau$ = $40\; h$ (fixed). <br>
Each experimental run involves 5 sampling times. <br>
Inputs $\mathbf{u}(t)$ can be manipulated and represented as a piece-wise constant profile over 5 switching intervals. <br>
The sampling times and the control variables switching times can be different. <br>
The elapsed time between any two sampling points is allowed to be between 0.1 and 20 h, 
and the duration of each control interval is allowed to be between 0.2 and 20 h. <br>

# Model with True parameters

In [5]:
import pyomo.environ as pyo
from pyomo.dae import ContinuousSet, DerivativeVar, Simulator
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [69]:
import pyomo.environ as pyo
from pyomo.dae import ContinuousSet, DerivativeVar
from pyomo.dae.simulator import Simulator
import numpy as np
import pandas as pd

# True parameter values for the mechanistic model
theta_true = [0.310, 0.180, 0.550, 0.050]


def create_biomass_model(
    time_horizon=40, theta_initial=None
):
    """Creates a Pyomo model for biomass growth."""
    m = pyo.ConcreteModel()

    # Times Set
    m.t = ContinuousSet(bounds=(0, time_horizon))

    # Parameters (fixed during simulation)
    m.theta1 = pyo.Var(initialize=theta_initial[0] if theta_initial else 1, domain=pyo.PositiveReals)
    m.theta2 = pyo.Var(initialize=theta_initial[1] if theta_initial else 1, domain=pyo.PositiveReals)
    m.theta3 = pyo.Var(initialize=theta_initial[2] if theta_initial else 1, domain=pyo.PositiveReals)
    m.theta4 = pyo.Var(initialize=theta_initial[3] if theta_initial else 1, domain=pyo.PositiveReals)

    # Time-varying inputs
    m.u1 = pyo.Var(m.t)
    m.u2 = pyo.Var(m.t)

    # State Variables
    m.x1 = pyo.Var(m.t, domain=pyo.NonNegativeReals)
    m.x2 = pyo.Var(m.t, domain=pyo.NonNegativeReals)

    # Derivatives
    m.dx1dt = DerivativeVar(m.x1, wrt=m.t)
    m.dx2dt = DerivativeVar(m.x2, wrt=m.t)

    # ODEs - with r inlined to avoid Expression evaluation issues
    def _dx1dt_rule(m, t):
        r = m.theta1 * m.x2[t] / (m.theta2 + m.x2[t])
        return m.dx1dt[t] == (r - m.u1[t] - m.theta4) * m.x1[t]

    m.dx1dt_con = pyo.Constraint(m.t, rule=_dx1dt_rule)

    def _dx2dt_rule(m, t):
        r = m.theta1 * m.x2[t] / (m.theta2 + m.x2[t])
        return m.dx2dt[t] == -r * m.x1[t] / m.theta3 + m.u1[t] * (m.u2[t] - m.x2[t])

    m.dx2dt_con = pyo.Constraint(m.t, rule=_dx2dt_rule)

    return m


def generate_synthetic_data(initial_x1, u1_profile, u2_profile, noise_matrix="A"):
    """Generates synthetic data for biomass growth model."""
    model = create_biomass_model(time_horizon=40)

    # Set true parameter values
    model.theta1.fix(theta_true[0])
    model.theta2.fix(theta_true[1])
    model.theta3.fix(theta_true[2])
    model.theta4.fix(theta_true[3])

    # Set initial condition
    model.x1[0].fix(initial_x1)
    model.x2[0].fix(0.0)

    # Initialize Simulator
    sim = Simulator(model, package='scipy')

    # Create Suffix for varying inputs
    varying_inputs = pyo.Suffix(direction=pyo.Suffix.LOCAL)
    varying_inputs[model.u1] = u1_profile
    varying_inputs[model.u2] = u2_profile

    # Simulate - returns (time_array, profiles_array)
    tsim = np.linspace(0, 40, 401)
    tsim_result, profiles = sim.simulate(
        numpoints=len(tsim),
        integrator='vode',
        varying_inputs=varying_inputs,
    )

    # Create DataFrame from time and profiles
    df = pd.DataFrame({'t': tsim_result, 'x1': profiles[:, 0], 'x2': profiles[:, 1]})

    # Select Measurement Points
    sample_times = [1, 5, 15, 20, 30]
    df_measured = df[df["t"].isin(sample_times)].copy()

    # Add Measurement Noise
    std_devs = {}
    if noise_matrix == "A":
        std_devs = {"x1": np.sqrt(0.01), "x2": np.sqrt(0.05)}
    elif noise_matrix == "B":
        std_devs = {"x1": np.sqrt(0.1), "x2": np.sqrt(0.5)}
    else:
        raise ValueError("Invalid noise matrix type. Choose 'A' or 'B'.")

    np.random.seed(42)
    df_measured["x1_noisy"] = df_measured["x1"] + np.random.normal(
        0, std_devs["x1"], size=len(df_measured)
    )
    df_measured["x2_noisy"] = df_measured["x2"] + np.random.normal(
        0, std_devs["x2"], size=len(df_measured)
    )

    return df, df_measured


# Example usage
controls_dict = {0: (0.15, 30.0), 10: (0.20, 10.0), 25: (0.05, 20.0)}
u1_profile = {t: vals[0] for t, vals in controls_dict.items()}
u2_profile = {t: vals[1] for t, vals in controls_dict.items()}
true_data, measured_data = generate_synthetic_data(
    initial_x1=5, u1_profile=u1_profile, u2_profile=u2_profile
)

In [71]:
true_data

Unnamed: 0,t,x1,x2
0,0.0,5.000000,0.000000
1,0.1,4.969858,0.321184
2,0.2,4.979694,0.565755
3,0.3,5.001896,0.783783
4,0.4,5.030735,0.985666
...,...,...,...
396,39.6,5.259586,0.091078
397,39.7,5.261761,0.091020
398,39.8,5.263913,0.090964
399,39.9,5.266044,0.090908


In [63]:
measured_data

Unnamed: 0,t,x1,x2,x1_noisy,x2_noisy
10,1.0,5.265617,2.000276,5.315288,1.947921
50,5.0,7.694514,4.118824,7.680688,4.471947
150,15.0,6.391754,0.206251,6.456523,0.377854
200,20.0,4.804952,0.406633,4.957255,0.301655
300,30.0,4.905243,0.101489,4.881827,0.222809


# Create the Experiment Class

In [59]:
from pyomo.contrib.parmest.experiment import Experiment
from pyomo.contrib.parmest import parmest
from pyomo.contrib.doe import DesignOfExperiments


In [None]:
class BiomassExperiment(Experiment):

    def __init__(
        self,
        data_df,
        theta_initial,
        u1_profile,
        u2_profile,
        x1_initial=5.0,
        x2_initial=0.0,
        time_horizon=40,
        measurment_error_matrix="A",
        scheme='LAGRANGE-RADAU',
        nfe=10,
        ncp=3,
        
    ):
        """Biomass Experiment class for parameter estimation and design of experiments.

        Parameters
        ----------
        data_df : pandas.DataFrame
            The measured data for the experiment.
        theta_initial : list
            Initial guess for the parameters [theta1, theta2, theta3, theta4].
        u1_profile : dict
            The profile for u1 (dilution factor).
        u2_profile : dict
            The profile for u2 (substrate concentration in feed).
        x1_initial : float
            Initial biomass concentration.
        x2_initial : float
            Initial substrate concentration.
        time_horizon : float
            The time horizon for the experiment.
        measurment_error_matrix : str
            Type of measurement error matrix ("A" or "B").
        nfe : int
            Number of finite elements for discretization.
        ncp : int
            Number of collocation points per finite element.
        """
        self.data = data_df
        self.theta_initial = theta_initial
        self.u1_profile = u1_profile
        self.u2_profile = u2_profile
        self.x1_initial = x1_initial
        self.x2_initial = x2_initial
        self.time_horizon = time_horizon

        if measurment_error_matrix == "A":
            self.measurement_std_devs = {"x1": np.sqrt(0.01), "x2": np.sqrt(0.05)}
        elif measurment_error_matrix == "B":
            self.measurement_std_devs = {"x1": np.sqrt(0.1), "x2": np.sqrt(0.5)}
        else:
            raise ValueError("Invalid noise matrix type. Choose 'A' or 'B'.")

        self.scheme = scheme
        self.nfe = nfe
        self.ncp = ncp
        self.model = None

    def create_model(self):
        """Creates a Pyomo model for biomass growth."""
        m = self.model= pyo.ConcreteModel()

        # Times Set
        m.t = ContinuousSet(bounds=(0, self.time_horizon))

        # Parameters (fixed during simulation)
        m.theta1 = pyo.Var(
            initialize=self.theta_initial[0] if self.theta_initial else 1,
            bounds=(1e-6, 1),
        )
        m.theta2 = pyo.Var(
            initialize=self.theta_initial[1] if self.theta_initial else 1,
            bounds=(1e-6, 1),
        )
        m.theta3 = pyo.Var(
            initialize=self.theta_initial[2] if self.theta_initial else 1,
            bounds=(1e-6, 1),
        )
        m.theta4 = pyo.Var(
            initialize=self.theta_initial[3] if self.theta_initial else 1,
            bounds=(1e-6, 1),
        )

        # Time-varying inputs
        m.u1 = pyo.Var(m.t, bounds=(0.05, 0.20))
        m.u2 = pyo.Var(m.t, bounds=(5.0, 35.0))

        # State Variables
        m.x1 = pyo.Var(m.t, domain=pyo.NonNegativeReals)
        m.x2 = pyo.Var(m.t, domain=pyo.NonNegativeReals)

        # Derivatives
        m.dx1dt = DerivativeVar(m.x1, wrt=m.t)
        m.dx2dt = DerivativeVar(m.x2, wrt=m.t)

        # ODEs - with r inlined to avoid Expression evaluation issues
        def _dx1dt_rule(m, t):
            r = m.theta1 * m.x2[t] / (m.theta2 + m.x2[t])
            return m.dx1dt[t] == (r - m.u1[t] - m.theta4) * m.x1[t]

        m.dx1dt_con = pyo.Constraint(m.t, rule=_dx1dt_rule)

        def _dx2dt_rule(m, t):
            r = m.theta1 * m.x2[t] / (m.theta2 + m.x2[t])
            return m.dx2dt[t] == -r * m.x1[t] / m.theta3 + m.u1[t] * (m.u2[t] - m.x2[t])

        m.dx2dt_con = pyo.Constraint(m.t, rule=_dx2dt_rule)

        return m        

    def _apply_inputs_to_model(self, model):
        """
        Helper function to interpolate and fix inputs u1/u2 at all discretized time points
        """
        # Sort keys for zero-oder hold logic
        u1_times = sorted(self.u1_profile.keys())
        u2_times = sorted(self.u2_profile.keys())

        def get_val(t, times, profile):
            active_t = 0
            for ct in times:
                if ct <= t:
                    active_t = ct
                else:
                    break
            return profile[active_t]

        # Iterate over all time points in discretized model
        for t in model.t:
            model.u1[t].fix(get_val(t, u1_times, self.u1_profile))
            model.u2[t].fix(get_val(t, u2_times, self.u2_profile))

    def finalize_model(self):
        m = self.model
        # Fix the parameter values
        m.theta1.fix()
        m.theta2.fix()
        m.theta3.fix()
        m.theta4.fix()

        ### Add critical time points before discretization
        # Add Measurement times
        if self.data is not None:
            measure_times = self.data["t"].unique()
            for t in measure_times:
                if t not in m.t:
                    m.t.add(t)
        # Add control switch times
        control_times = list(self.u1_profile.keys()) + list(self.u2_profile.keys())
        control_times = sorted(set(control_times))
        for t in control_times:
            if t not in m.t and t <= m.t.last():
                m.t.add(t)

        ### Discretize the model
        if self.scheme in ('LAGRANGE-RADAU', 'LAGRANGE-LEGENDRE'):
            discretizer = pyo.TransformationFactory('dae.collocation')
            discretizer.apply_to(m, nfe=self.nfe, ncp=self.ncp, scheme=self.scheme)
        elif self.scheme in ("BACKWARD", "FORWARD", "CENTRAL"):
            discretizer = pyo.TransformationFactory('dae.finite_difference')
            discretizer.apply_to(m, nfe=self.nfe, scheme=self.scheme)
        else:
            raise ValueError("Invalid discretization scheme. Choose 'LAGRANGE-RADAU' or 'LAGRANGE-LEGENDRE'.")

        ### Fix inputs at all discretized time points
        self._apply_inputs_to_model(m)

        # Fix initial conditions
        m.x1[0].fix(self.x1_initial)
        m.x2[0].fix(self.x2_initial)

        # Intialize the state variables: we can do that by simulating the model (better approach)
        # or setting to some reasonable guess (easy to implement)
        if self.simulation_initialization:
            sim = Simulator(m, package='scipy')
            tsim = np.linspace(0, self.time_horizon, 401)
            tsim_result, profiles = sim.simulate(
                numpoints=len(tsim),
                integrator='vode',
            )
            for i, t in enumerate(m.t):
                m.x1[t].set_value(profiles[i, 0])
                m.x2[t].set_value(profiles[i, 1])
        else:
            for t in m.t:
                m.x1[t].set_value(5.0)  # Initial guess
                m.x2[t].set_value(0.0)  # Initial guess

        # solver = pyo.SolverFactory('ipopt')
        # solver.options["linear_solver"] = "ma57"
        # results = solver.solve(m, tee=True)
        return m

    def label_experiment(self):
        m = self.model

        # Set measurement variable labels
        m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL)
        m.experiment_outputs.update((m.x1[t], x1_val) for t, x1_val in zip(self.data["t"], self.data["x1_noisy"]))
        m.experiment_outputs.update((m.x2[t], x2_val) for t, x2_val in zip(self.data["t"], self.data["x2_noisy"]))

        # Set design variables / inputs labels
        m.experiment_inputs = pyo.Suffix(direction=pyo.Suffix.LOCAL)
        m.experiment_inputs.update((m.u1[t], u1_val) for t, u1_val in self.u1_profile.items())
        m.experiment_inputs.update((m.u2[t], u2_val) for t, u2_val in self.u2_profile.items())
        m.experiment_inputs[m.x1[m.t.first()]] = None

        # Set Unknown Parameters labels
        m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL)
        m.unknown_parameters.update((k, pyo.value(k)) for k in [m.theta1, m.theta2, m.theta3, m.theta4])

        # Set measurement standard deviations
        m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL)
        m.measurement_error.update((m.x1[t], self.measurement_std_devs["x1"]) for t in self.data["t"])
        m.measurement_error.update((m.x2[t], self.measurement_std_devs["x2"]) for t in self.data["t"])

        return m

    def get_labeled_model(self):
        if self.model is None:
            self.create_model()
            self.finalize_model()
            self.label_experiment()
        return self.model

In [162]:
# Example usage of BiomassExperiment
theta_initial_more_accurate = [0.357, 0.153, 0.633, 0.043]
theta_initial_less_accurate = [0.527, 0.054, 0.935, 0.015]
controls_dict = {0: (0.15, 30.0), 10: (0.20, 10.0), 25: (0.05, 10.0)}
# controls_dict = {0: (0.15, 30.0), 10: (0.20, 10.0)}
u1_profile = {t: vals[0] for t, vals in controls_dict.items()}
u2_profile = {t: vals[1] for t, vals in controls_dict.items()}
experiment = BiomassExperiment(
    data_df=measured_data,
    theta_initial=theta_initial_more_accurate,
    u1_profile=u1_profile,
    u2_profile=u2_profile,
    x1_initial=5.0,
    x2_initial=0.0,
    time_horizon=40,
    measurment_error_matrix="A",
    scheme="LAGRANGE-RADAU",
    nfe=20,
    ncp=3,
)

pest = parmest.Estimator([experiment], obj_function='SSE', tee=True)

obj, theta = pest.theta_est()
# model = experiment.get_labeled_model()

Ipopt 3.13.2: 

******************************************************************************
This program contains Ipopt, a library for large-scale nonlinear optimization.
 Ipopt is released as open source code under the Eclipse Public License (EPL).
         For more information visit http://projects.coin-or.org/Ipopt

This version of Ipopt was compiled from source code available at
    https://github.com/IDAES/Ipopt as part of the Institute for the Design of
    Advanced Energy Systems Process Systems Engineering Framework (IDAES PSE
    Framework) Copyright (c) 2018-2019. See https://github.com/IDAES/idaes-pse.

This version of Ipopt was compiled using HSL, a collection of Fortran codes
    for large-scale scientific computation.  All technical papers, sales and
    publicity material resulting from use of the HSL codes within IPOPT must
    contain the following acknowledgement:
        HSL, a collection of Fortran codes for large-scale scientific
        computation. See http://

In [163]:
obj

0.3471394226373234

In [164]:
theta

theta1    0.253645
theta2    0.249814
theta3    0.439945
theta4    0.000593
dtype: float64