<a href="https://colab.research.google.com/github/yixuanzho/Research-project/blob/main/tests/solve_dynamic_system_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Dynamic Flux Balance Analysis (dFBA) in COBRApy
# The following notebook shows a simple, but slow example of implementing dFBA using COBRApy and scipy.integrate.solve_ivp.
# This notebook shows a static optimization approach (SOA) implementation and should not be considered production ready.
# The model considers only basic Michaelis-Menten limited growth on glucose.
import arviz as az
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
from tqdm import tqdm

from numba import njit
from pytensor.compile.ops import as_op
from scipy.integrate import solve_ivp

import matplotlib.pyplot as plt
# %matplotlib inline


In [2]:
# %load_ext watermark
az.style.use("arviz-darkgrid")
rng = np.random.default_rng(1234)
plt.ioff()  # turn off interactive plotting
output_messages = []  # store printed output


In [3]:
# Create or load a cobrapy model. Here, we use the ‘textbook’ e-coli core model.
import cobra
from cobra.io import load_model
cobra_model = load_model('textbook')


In [4]:
# Set up the dynamic system
# Dynamic flux balance analysis couples a dynamic system in external cellular concentrations to a pseudo-steady state metabolic model.
# In this notebook, we define the function add_dynamic_bounds(model, y) to convert the external metabolite concentrations into bounds on the boundary fluxes in the metabolic model.
def add_dynamic_bounds(cobra_model, y, parameters):
    """Use external concentrations to bound the uptake flux of glucose."""
    biomass, glucose = y  # expand the boundary species
    # Assumption: Michaelis-Menten kinetics for glucose uptake
    # V = maximum reaction rate
    # Km = Michaelis constant, which represents the substrate concentration at which the reaction rate is half of V.
    V, Km = parameters
    glucose_max_import = -V * glucose / (Km + glucose)
    cobra_model.reactions.EX_glc__D_e.lower_bound = glucose_max_import


def dynamic_system(t, y, parameters):
    """Calculate the time derivative of external species."""

    biomass, glucose = y  # expand the boundary species

    # Calculate the specific exchanges fluxes at the given external concentrations.
    with cobra_model:
        add_dynamic_bounds(cobra_model, y, parameters)

        cobra.util.add_lp_feasibility(cobra_model)
        feasibility = cobra.util.fix_objective_as_constraint(cobra_model)
        lex_constraints = cobra.util.add_lexicographic_constraints(
            cobra_model, ['Biomass_Ecoli_core', 'EX_glc__D_e'], ['max', 'max'])

    # Since the calculated fluxes are specific rates, we multiply them by the
    # biomass concentration to get the bulk exchange rates.
    fluxes = lex_constraints.values
    fluxes *= biomass

    # This implementation is **not** efficient, so I display the current
    # simulation time using a progress bar.
    if dynamic_system.pbar is not None:
        dynamic_system.pbar.update(1)
        dynamic_system.pbar.set_description('t = {:.3f}'.format(t))

    return fluxes

dynamic_system.pbar = None


def infeasible_event(t, y, parameters):
    """
    Determine solution feasibility.

    Avoiding infeasible solutions is handled by solve_ivp's built-in event detection.
    This function re-solves the LP to determine whether or not the solution is feasible
    (and if not, how far it is from feasibility). When the sign of this function changes
    from -epsilon to positive, we know the solution is no longer feasible.

    """

    with cobra_model:

        add_dynamic_bounds(cobra_model, y, parameters)

        cobra.util.add_lp_feasibility(cobra_model)
        feasibility = cobra.util.fix_objective_as_constraint(cobra_model)

    return feasibility - infeasible_event.epsilon

infeasible_event.epsilon = 1E-6
infeasible_event.direction = 1
infeasible_event.terminal = True


In [5]:
# Set up the simulation
def solve_dynamic_system(parameters):

    # with tqdm() as pbar:
        # dynamic_system.pbar = pbar

        V, Km = parameters

        sol = solve_ivp(
            fun=dynamic_system,
            events=[infeasible_event],
            t_span=(ts.min(), ts.max()),
            y0=y0,
            t_eval=ts,
            rtol=1e-6,
            atol=1e-8,
            method='BDF',
            args=(parameters,)
        )

        return sol


In [6]:
# Run the dynamic FBA simulation
ts = np.linspace(0, 15, 100)  # Desired integration resolution and interval
y0 = [0.1, 10]  # Initial conditions for biomass and glucose
parameters = [10, 5]  # Parameters for the Michaelis-Menten kinetics

sol = solve_dynamic_system(parameters)


In [7]:
# Because the culture runs out of glucose, the simulation terminates early.
# The exact time of this ‘cell death’ is recorded in sol.t_events.
sol


  message: A termination event occurred.
  success: True
   status: 1
        t: [ 0.000e+00  1.515e-01 ...  5.606e+00  5.758e+00]
        y: [[ 1.000e-01  1.090e-01 ...  8.715e-01  8.727e-01]
            [ 1.000e+01  9.895e+00 ...  3.476e-01  2.710e-01]]
      sol: None
 t_events: [array([ 5.802e+00])]
 y_events: [array([[ 8.728e-01,  2.518e-01]])]
     nfev: 179
     njev: 2
      nlu: 14

In [8]:
# Plot timelines of biomass and glucose
# ax = plt.subplot(111)
# ax.plot(sol.t, sol.y.T[:, 0], color='b')
# ax2 = plt.twinx(ax)
# ax2.plot(sol.t, sol.y.T[:, 1], color='r')

# ax.set_ylabel('Biomass', color='b')
# ax2.set_ylabel('Glucose', color='r')
# plt.show()


In [9]:
# Store the simulation results in a pandas DataFrame
data = pd.DataFrame(dict(time=sol.t, biomass=sol.y[0], glucose=sol.y[1]))
data.head()


Unnamed: 0,time,biomass,glucose
0,0.0,0.1,10.0
1,0.151515,0.108976,9.894703
2,0.30303,0.118717,9.780402
3,0.454545,0.129279,9.656422
4,0.606061,0.140723,9.522053


In [10]:
# Gradient-Free Sampler Options
# Like other Numpy or Scipy-based functions, the scipy.integrate.solve_ivp function cannot be used directly in a PyMC model
# because PyMC needs to know the variable input and output types to compile.
# Therefore, we use a Pytensor wrapper to give the variable types to PyMC.
# Then the function can be used in PyMC in conjunction with gradient-free samplers.

# Convert Python Function to a Pytensor Operator using @as_op decorator
# We tell PyMC the input variable types and the output variable types using the @as_op decorator.
# solve_ivp returns Numpy arrays, but we tell PyMC that they are Pytensor double float tensors for this purpose.

# Remember, when using @as_op with PyMC, the function becomes a "black box" to PyMC, meaning it can't compute gradients through it.
# This limits you to using gradient-free samplers.

# decorator with input and output types a Pytensor double float tensors
@as_op(itypes=[pt.dvector, pt.dvector], otypes=[pt.dmatrix])
def pytensor_model(parameters, y0):
    # with tqdm() as pbar:
        # dynamic_system.pbar = pbar

        V, Km = parameters

        sol = solve_ivp(
            fun=dynamic_system,
            events=[infeasible_event],
            t_span=(ts.min(), ts.max()),
            y0=y0,
            t_eval=ts,
            rtol=1e-6,
            atol=1e-8,
            method='BDF',
            args=(parameters,)
        )

        # Important: Ensure we return the same dimensions as our data
        # Create a result array of the same shape as the data
        result = np.zeros_like(data[["biomass", "glucose"]].values)

        # Fill in the values we have (might be fewer than expected due to early termination)
        rows_to_fill = min(len(sol.t), result.shape[0])
        result[:rows_to_fill, 0] = sol.y[0, :rows_to_fill]
        result[:rows_to_fill, 1] = sol.y[1, :rows_to_fill]

        return result


In [11]:
# PyMC Model
# Now, we can specify the PyMC model using the ODE solver.
# For priors, we will use the manually set values: parameters = [10, 5] for V and Km,
# and y0 = [0.1, 10] for initial biomass and glucose concentrations.
# These are empirically derived weakly informative priors that are constrained to be positive.
# We will use a normal likelihood on untransformed data (i.e., not log transformed) to best fit the peaks of the data.

with pm.Model() as model:
    # Priors for kinetic parameters and initial conditions
    V = pm.TruncatedNormal("V", mu=parameters[0], sigma=1.0, lower=0, initval=parameters[0])
    Km = pm.TruncatedNormal("Km", mu=parameters[1], sigma=0.5, lower=0, initval=parameters[1])
    biomassT0 = pm.TruncatedNormal("biomassT0", mu=y0[0], sigma=1, lower=0, initval=y0[0])
    glucoseT0 = pm.TruncatedNormal("glucoseT0", mu=y0[1], sigma=1, lower=0, initval=y0[1])
    sigma = pm.HalfNormal("sigma", 10)

    # Ode solution function
    ode_solution = pytensor_model(
        pm.math.stack([V, Km]),
        pm.math.stack([biomassT0, glucoseT0])
    )

    # Likelihood
    pm.Normal("Y_obs", mu=ode_solution, sigma=sigma, observed=data[["biomass", "glucose"]].values)


In [12]:
# pm.model_to_graphviz(model=model)


In [13]:
# Gradient-Free Sampler Options
# Having good gradient free samplers can open up the models that can be fit within PyMC. There are five options for gradient-free samplers in PyMC that are applicable to this problem (?):
# Slice - the default gradient-free sampler
# DEMetropolisZ - a differential evolution Metropolis sampler that uses the past to inform sampling jumps
# DEMetropolis - a differential evolution Metropolis sampler
# Metropolis - the vanilla Metropolis sampler
# SMC - Sequential Monte Carlo

# A few notes on running these inferences.
# For each sampler, the number of tuning steps and draws have been reduced to run the inference in a reasonable amount of time (on the order of minutes).
# This is not a sufficient number of draws to get a good inferences, in some cases, but it works for demonstration purposes.
# In addition, multicore processing was not working for the Pytensor op function on all machines, so inference is performed on one core.


In [14]:
# Slice Sampler
# Variable list to give to the sample step parameter
vars_list = list(model.values_to_rvs.keys())[:-1]


In [15]:

# Specify the sampler
# sampler = "Slice Sampler"  # The Slice sampler was very slow
# tune = draws = 100

sampler = "DEMetropolisZ"  # Use Metropolis instead
tune = draws = 10  # Start with minimal tuning and draws for demonstration purposes

# Inference!
if __name__ == '__main__':
    with model:
        trace_slice = pm.sample(step=[pm.Metropolis(vars_list)], tune=tune, draws=draws)
    trace = trace_slice
    summary = az.summary(trace)

    output_messages.append("\nSampling summary:")
    output_messages.append(str(summary))

    print("\n".join(output_messages))

    # az.plot_trace(trace, kind="rank_bars")
    # plt.suptitle(f"Trace Plot {sampler}");

    # fig, ax = plt.subplots(figsize=(12, 4))
    # plot_inference(ax, trace, title=f"Data and Inference Model Runs\n{sampler} Sampler");

    # plt.show(block=True)





Output()


Sampling summary:
             mean     sd  hdi_3%  hdi_97%  mcse_mean  mcse_sd  ess_bulk  \
Km          4.474  0.539   3.949    5.000      0.241    0.182       5.0   
V          10.582  0.428  10.075   10.997      0.191    0.144       5.0   
biomassT0   0.074  0.004   0.067    0.079      0.002    0.001       5.0   
glucoseT0  10.437  0.374  10.072   10.802      0.167    0.126       5.0   
sigma       0.500  0.425   0.173    1.488      0.181    0.136       5.0   

           ess_tail        r_hat  
Km              5.0  92155435.79  
V               7.0         3.34  
biomassT0       5.0         4.92  
glucoseT0       5.0  92155435.79  
sigma          20.0         3.70  


  (between_chain_variance / within_chain_variance + num_samples - 1) / (num_samples)
