In [1]:
from typing import Dict, Union, Optional, List, Any
from datetime import datetime
import json

import numpy as np
from COPASI import CDataModel
from numpy import ndarray, dtype, array, append as npAppend
from pandas import DataFrame
from basico import *
from process_bigraph import Process

from biosimulators_processes import CORE
from biosimulators_processes.data_model.sed_data_model import MODEL_TYPE


class CopasiProcess(Process):
    config_schema = {
        'model': MODEL_TYPE,
        'method': {
            '_type': 'string',
            '_default': 'deterministic'
        }
    }

    def __init__(self,
                 config: Dict[str, Union[str, Dict[str, str], Dict[str, Optional[Dict[str, str]]], Optional[Dict[str, str]]]] = None,
                 core: Dict = None):
        super().__init__(config, core)

        # insert copasi process model config
        model_source = self.config['model'].get('model_source') or self.config.get('sbml_fp')
        assert model_source is not None, 'You must specify a model source of either a valid biomodel id or model filepath.'
        model_changes = self.config['model'].get('model_changes', {})
        self.model_changes = {} if model_changes is None else model_changes

        # Option A:
        if '/' in model_source:
            self.copasi_model_object = load_model(model_source)
            print('found a filepath')

        # Option C:
        else:
            if not self.model_changes:
                raise AttributeError(
                    """You must pass a source of model changes specifying params, reactions, 
                        species or all three if starting from an empty model.""")
            model_units = self.config['model'].get('model_units', {})
            self.copasi_model_object = new_model(
                name='CopasiProcess TimeCourseModel',
                **model_units)

        # handle context of species output
        context_type = self.config.get('species_context', 'concentrations')
        self.species_context_key = f'floating_species_{context_type}'
        self.use_counts = 'concentrations' in context_type

        # Get a list of reactions
        self._set_reaction_changes()
        reactions = get_reactions(model=self.copasi_model_object)
        self.reaction_list = reactions.index.tolist() if reactions is not None else []
        reaction_data = get_reactions(model=self.copasi_model_object)['scheme']
        reaction_schemas = reaction_data.values.tolist()
        reaction_names = reaction_data.index.tolist()
        self.reactions = dict(zip(reaction_names, reaction_schemas))

        # Get the species (floating only)  TODO: add boundary species
        self._set_species_changes()
        species_data = get_species(model=self.copasi_model_object)
        self.floating_species_list = species_data.index.tolist()
        self.floating_species_initial = species_data.particle_number.tolist() \
            if self.use_counts else species_data.concentration.tolist()

        # Get the list of parameters and their values (it is possible to run a model without any parameters)
        self._set_global_param_changes()
        model_parameters = get_parameters(model=self.copasi_model_object)
        self.model_parameters_list = model_parameters.index.tolist() \
            if isinstance(model_parameters, DataFrame) else []
        self.model_parameters_values = model_parameters.initial_value.tolist() \
            if isinstance(model_parameters, DataFrame) else []

        # Get a list of compartments
        self.compartments_list = get_compartments(model=self.copasi_model_object).index.tolist()

        # ----SOLVER: Get the solver (defaults to deterministic)
        self.method = self.config['method']

    def initial_state(self):
        # keep in mind that a valid simulation may not have global parameters
        model_parameters_dict = dict(
            zip(self.model_parameters_list, self.model_parameters_values))

        floating_species_dict = dict(
            zip(self.floating_species_list, self.floating_species_initial))

        return {
            'time': 0.0,
            'model_parameters': model_parameters_dict,
            self.species_context_key: floating_species_dict,
            'reactions': self.reactions
        }

    def inputs(self):
        # dependent on species context set in self.config
        floating_species_type = {
            species_id: {
                '_type': 'float',
                '_apply': 'set'}
            for species_id in self.floating_species_list
        }

        model_params_type = {
            param_id: {
                '_type': 'float',
                '_apply': 'set'}
            for param_id in self.model_parameters_list
        }

        reactions_type = {
            reaction_id: 'string'
            for reaction_id in self.reaction_list
        }

        return {
            'time': 'float',
            self.species_context_key: floating_species_type,
            'model_parameters': model_params_type,
            'reactions': 'tree[string]'
        }

    def outputs(self):
        floating_species_type = {
            species_id: {
                '_type': 'float',
                '_apply': 'set'}
            for species_id in self.floating_species_list
        }
        reactions_type = {
            reaction_id: 'string'
            for reaction_id in self.reaction_list
        }
        return {
            'time': 'float',
            self.species_context_key: floating_species_type,
            'reactions': 'tree[string]'
        }

    def update(self, inputs, interval):
        # set copasi values according to what is passed in states for concentrations
        for cat_id, value in inputs[self.species_context_key].items():
            set_type = 'particle_number' if 'counts' in self.species_context_key else 'concentration'
            species_config = {
                'name': cat_id,
                'model': self.copasi_model_object,
                set_type: value}
            set_species(**species_config)

        # run model for "interval" length; we only want the state at the end
        timecourse = run_time_course(
            start_time=inputs['time'],
            duration=interval,
            update_model=True,
            model=self.copasi_model_object,
            method=self.method)

        # extract end values of concentrations from the model and set them in results
        results = {'reactions': self.reactions}
        if self.use_counts:
            results[self.species_context_key] = {
                mol_id: float(get_species(
                    name=mol_id,
                    exact=True,
                    model=self.copasi_model_object
                ).particle_number[0])
                for mol_id in self.floating_species_list}
        else:
            results[self.species_context_key] = {
                mol_id: float(get_species(
                    name=mol_id,
                    exact=True,
                    model=self.copasi_model_object
                ).concentration[0])
                for mol_id in self.floating_species_list}

        return results

    def _set_reaction_changes(self):
        # ----REACTIONS: set reactions
        existing_reactions = get_reactions(model=self.copasi_model_object)
        existing_reaction_names = existing_reactions.index.tolist() if existing_reactions is not None else []
        reaction_changes = self.model_changes.get('reaction_changes', [])
        if reaction_changes:
            for reaction_change in reaction_changes:
                reaction_name: str = reaction_change['reaction_name']
                param_changes: list[dict[str, float]] = reaction_change['parameter_changes']
                scheme_change: str = reaction_change.get('reaction_scheme')
                # handle changes to existing reactions
                if param_changes:
                    for param_name, param_change_val in param_changes:
                        set_reaction_parameters(param_name, value=param_change_val, model=self.copasi_model_object)
                if scheme_change:
                    set_reaction(name=reaction_name, scheme=scheme_change, model=self.copasi_model_object)
                # handle new reactions
                if reaction_name not in existing_reaction_names and scheme_change:
                    add_reaction(reaction_name, scheme_change, model=self.copasi_model_object)

    def _set_species_changes(self):
        # ----SPECS: set species changes
        species_changes = self.model_changes.get('species_changes', [])
        if species_changes:
            for species_change in species_changes:
                if isinstance(species_change, dict):
                    species_name = species_change.pop('name')
                    changes_to_apply = {}
                    for spec_param_type, spec_param_value in species_change.items():
                        if spec_param_value:
                            changes_to_apply[spec_param_type] = spec_param_value
                    set_species(**changes_to_apply, model=self.copasi_model_object)

    def _set_global_param_changes(self):
        # ----GLOBAL PARAMS: set global parameter changes
        global_parameter_changes = self.model_changes.get('global_parameter_changes', [])
        if global_parameter_changes:
            for param_change in global_parameter_changes:
                param_name = param_change.pop('name')
                for param_type, param_value in param_change.items():
                    if not param_value:
                        param_change.pop(param_type)
                    # handle changes to existing params
                    set_parameters(name=param_name, **param_change, model=self.copasi_model_object)
                    # set new params
                    global_params = get_parameters(model=self.copasi_model_object)
                    if global_params:
                        existing_global_parameters = global_params.index
                        if param_name not in existing_global_parameters:
                            assert param_change.get('initial_concentration') is not None, "You must pass an initial_concentration value if adding a new global parameter."
                            add_parameter(name=param_name, **param_change, model=self.copasi_model_object)

Cannot register SmoldynStep. Error:
**
No module named 'simulariumio'
**
Cannot register SimulariumSmoldynStep. Error:
**
No module named 'simulariumio'
**
Cannot register MongoDatabaseEmitter. Error:
**
No module named 'simulariumio'
**


In [2]:
import os
import warnings
from pathlib import Path
from random import randint


import numpy as np
import cobra
from cobra import Model, Reaction, Metabolite
from cobra.io import read_sbml_model
from process_bigraph import Process, Composite, ProcessTypes, Step

from biosimulators_processes import CORE
from biosimulators_processes.data_model.sed_data_model import MODEL_TYPE
# from biosimulators_processes.viz.plot import plot_time_series, plot_species_distributions_to_gif


# logging.getLogger('cobra').setLevel(logging.ERROR)


class CobraProcess(Process):
    config_schema = {
        'model': MODEL_TYPE,
        'model_name': {
            '_type': 'string',
            '_default': 'FBA'
        }
    }

    def __init__(self, config, core):
        super().__init__(config, core)

        # create "empty" model
        self.model_name = self.config.get('model_name')
        model_file = self.config['model'].get('model_source')
        if model_file:
            data_dir = Path(os.path.dirname(model_file))
            path = data_dir / model_file.split('/')[-1]
            self.model = read_sbml_model(str(path.resolve()))
        else:
            self.model = None

        self.reaction_mappings = None
        self.substrates_input = None

    def initial_state(self):
        import random 
        names = [
            'LacI protein',
            'TetR protein',
            'cI protein',
            'LacI mRNA',
            'TetR mRNA',
            'cI mRNA'
        ]
        subs = [float(random.randint(1, 3)) for _ in names]
        return {'substrates': dict(zip(names, subs)), 'substrates_input': {}}
        
        
    def inputs(self):
        return {
            'reactions': 'tree[string]',
            'substrates': 'tree[float]'
        }
    
    def outputs(self):
        return {
            'substrates': 'tree[float]',
            'substrates_input': 'tree[float]'
        }
    
    def update(self, state, interval):
        from random import randint
        # parse input state
        self.substrates_input = state['substrates']
        reactions = state['reactions']
        species_names = list(self.substrates_input.keys())
        self.reaction_mappings = self.generate_reaction_mappings(species_names, reactions)

        # generate model based on input state data
        # if self.model is None:
            # self.model = self.generate_model(reactions, species_names)
        
        substrate_update = self.get_substrates_update()
        return {'substrates': substrate_update, 'substrates_input': self.substrates_input}
                
    def generate_model(self, reactions_dict, species_names) -> Model:
        model = Model(self.model_name)
        reactions = list(reactions_dict.values())
        reaction_names = list(reactions_dict.keys())

        # generate metabolites
        metabolite_dict = {}
        for species in species_names:
            species_id = species.replace(" ", "_").replace('"', '')
            metabolite = Metabolite(
                species_id,  
                name=species,
                compartment='cell'  
            )
            metabolite_dict[species] = metabolite  
    
        for i, reaction_scheme in enumerate(reactions):
            reaction_name = reaction_names[i]
            reaction_id = reaction_name.replace(" ", "_")
            reaction = Reaction(reaction_id)  
            scheme_str = reaction_scheme
            reactants, products = scheme_str.split("->")
            
            metabolites_to_add = {}
            reactant_list = reactants.split(";")
            for reactant in reactant_list:
                reactant = reactant.strip().replace('"', '')  # Clean up string
                if reactant in metabolite_dict:
                    metabolites_to_add[metabolite_dict[reactant]] = -1.0  # Reactants are consumed (-1)

            product_list = products.split(";")
            for product in product_list:
                product = product.strip().replace('"', '')  # Clean up string
                if product in metabolite_dict:
                    metabolites_to_add[metabolite_dict[product]] = 1.0  # Products are produced (+1)
    
            reaction.add_metabolites(metabolites_to_add)
            model.add_reactions([reaction])
    
        return model

    def apply_reaction_kinetics(self):
        """
        Apply Michaelis-Menten kinetics to the reactions based on substrate concentrations.
        """
        for metabolite in self.model.metabolites:
            lookup_key = metabolite.name
            Km, Vmax = (float(randint(1, 3)), randint(1, 3))  # Km and Vmax can be set dynamically
            substrate_concentration = self.substrates_input.get(lookup_key)
            uptake_rate = Vmax * substrate_concentration / (Km + substrate_concentration)
            
            # Assign uptake rate to each relevant reaction's lower bound
            for reaction in self.model.reactions:
                self.model.reactions.get_by_id(reaction.id).lower_bound = -uptake_rate
            for reaction in metabolite.reactions:
                self.model.reactions.get_by_id(reaction.id).lower_bound = -uptake_rate

    def generate_reaction_mappings(self, output_names, reactions) -> dict:
        mappings = []
        for reaction in reactions:
            for name in output_names:
                rxn = [r.lower() for r in list(reaction.values()).split(" ")]
                obs_name = name.split(" ")[0].lower()
                obs_type = name.split(" ")[-1]
                if obs_name in rxn:
                    mapping = {}
                    if "transcription" in rxn and obs_type == "mRNA":
                        mapping = {name: reaction.name}
                    elif "translation" in rxn and obs_type == "protein":
                        mapping = {name: reaction.name}
                    elif "degradation" in rxn:
                        if "transcripts" in rxn and obs_type == "mRNA":
                            mapping = {name: reaction.name}
                        elif "transcripts" not in rxn and obs_type == "protein":
                            mapping = {name: reaction.name}
                    if mapping:
                        mappings.append(mapping)
        return mappings
    
    def apply_ode_results_to_fba(self, time_point, ode_results) -> None:
        """
        Apply ODE results to COBRApy model reactions based on mappings.
    
        :param time_point: Current time step in the ODE simulation
        :param ode_results: Time-series DataFrame with concentration data
        """
        for mapping in self.reactions_mapping:
            for ode_species, cobra_reaction in mapping.items():
                concentration = ode_results.loc[time_point, ode_species]
                reaction = self.model.reactions.get_by_id(cobra_reaction)
                print(f'REACTION: {reaction}')
                
                # TODO: customize this
                if 'degradation' in cobra_reaction:
                    reaction.lower_bound = -concentration  # Assuming a negative flux for degradation
                else:
                    # TODO: customize this
                    reaction.upper_bound = concentration 
    
    def get_substrates_update(self):
        from optlang.symbolics import Zero
        
        substrates_input = self.substrates_input

        # set the model objective to a weighted sum accross all model reactions TODO: change this
        self.model.objective = Zero
        reactions = self.model.reactions
        weights = [1.0] * len(reactions)
        objective_expression = Zero  # Start with zero
        for reaction, weight in zip(reactions, weights):
            objective_expression += weight * reaction.flux_expression
        
        self.model.objective = objective_expression
    
        # apply MM kinetics
        self.apply_reaction_kinetics()
        
        # now that bounds are updated, solve
        with self.model:
            cobra.util.solver.add_lp_feasibility(self.model)
            self.model.optimize()
    
        # update concentrations based on fluxes
        substrate_update = {}
        solution = self.model.optimize()
        # if solution.status == 'optimal':
        for metabolite in self.model.metabolites:
            lookup_key = metabolite.name
            for reaction in metabolite.reactions:
                flux = solution.fluxes[reaction.id]  # Get the flux for the reaction
                old_concentration = substrates_input[lookup_key]
                new_concentration = max(old_concentration + flux, 0)  # Update concentration, keep above 0
                substrate_update[lookup_key] = new_concentration - old_concentration
        return substrate_update

    def dynamic_fba_system(self, t, y):
        pass 
    
    def run_dynamic_fba(self, time_points):
        with tqdm() as pbar:
            self.dynamic_fba_system.pbar = pbar
            
            solution = solve_ivp(
                fun=self.dynamic_fba_system,
                t_span=(min(time_points), max(time_points)),
                y0=y0,
                t_eval=time_points,
                method='BDF',  # TODO: change to the stiff solver if needed
                rtol=1e-6,
                atol=1e-8
            )
        return solution


In [4]:
CORE.process_registry.register('fba', CobraProcess)
CORE.process_registry.register('copasi', CopasiProcess)

In [5]:
model_fp = '/Users/alexanderpatrie/Desktop/repos/biosimulator-processes/test_suite/examples/sbml-core/Elowitz-Nature-2000-Repressilator/BIOMD0000000012_url.xml'

copasi_spec = {
    'ode': {
        '_type': 'process',
        'address': 'local:copasi',
        'config': {
            'model': {
                'model_source': model_fp
            }
        },
        'inputs': {
            'time': ['time_store'],
            'floating_species_concentrations': ['floating_species_concentrations_store'],
            'model_parameters': ['model_parameters_store'],
            'reactions': ['reactions_store']
        },
        'outputs': {
            'time': ['time_store'],
            'floating_species_concentrations': ['floating_species_concentrations_store'],
            'reactions': ['reactions_store']
        }
    }
}

fba_spec = {
    'fba': {
        '_type': 'process',
        'address': 'local:fba',
        'config': {
            'model': {
                'model_source': model_fp
            }
        },
        'inputs': {
            'substrates': ['ode', 'floating_species_concentrations_store'],
            'reactions': ['ode', 'reactions_store'],
        },
        'outputs': {
            'substrates': ['substrates_store'],
            'substrates_input': ['substrates_input_store']
        }
    }
}

emitter_spec = {
    'emitter': {
        '_type': 'step',
        'address': 'local:ram-emitter',
        'config': {
            'emit': {
                'floating_species_concentrations': 'tree[float]',
                'substrates': 'tree[float]',
                'substrates_input': 'tree[float]',
                'time': 'float'
            }
        },
        'inputs': {
            'floating_species_concentrations': ['floating_species_concentrations_store'],
            'substrates': ['substrates_store'],
            'substrates_input': ['substrates_input_store'],
            'time': ['time_store']
        }
    }
}

spec = copasi_spec.copy()
spec.update(fba_spec)
spec.update(emitter_spec)

In [6]:
from process_bigraph import Composite


comp = Composite(config={'state': spec}, core=CORE)

Model does not contain SBML fbc package information.
SBML package 'layout' not supported by cobrapy, information is not parsed
SBML package 'render' not supported by cobrapy, information is not parsed
Missing lower flux bound set to '-1000.0' for reaction: '<Reaction Reaction1 "degradation of LacI transcripts">'
Missing upper flux bound set to '1000.0' for reaction: '<Reaction Reaction1 "degradation of LacI transcripts">'
Missing lower flux bound set to '-1000.0' for reaction: '<Reaction Reaction2 "degradation of TetR transcripts">'
Missing upper flux bound set to '1000.0' for reaction: '<Reaction Reaction2 "degradation of TetR transcripts">'
Missing lower flux bound set to '-1000.0' for reaction: '<Reaction Reaction3 "degradation of CI transcripts">'
Missing upper flux bound set to '1000.0' for reaction: '<Reaction Reaction3 "degradation of CI transcripts">'
Missing lower flux bound set to '-1000.0' for reaction: '<Reaction Reaction4 "translation of LacI">'
Missing upper flux bound se

found a filepath


In [8]:
from cobra.io import sbml

sbml.validate_sbml_model(model_fp)

COBRA errors in validation, check error log for details.


(<Model BIOMD0000000012 at 0x35c017b80>,
 {'SBML_FATAL': [],
  'SBML_ERROR': [],
  'SBML_SCHEMA_ERROR': [],
  'COBRA_FATAL': [],
  'COBRA_ERROR': ['No objective coefficients in model. Unclear what should be optimized'],
   "SBML package 'layout' not supported by cobrapy, information is not parsed",
   "SBML package 'render' not supported by cobrapy, information is not parsed",
   'Missing lower flux bound set to \'-1000.0\' for reaction: \'<Reaction Reaction1 "degradation of LacI transcripts">\'',
   'Missing upper flux bound set to \'1000.0\' for reaction: \'<Reaction Reaction1 "degradation of LacI transcripts">\'',
   'Missing lower flux bound set to \'-1000.0\' for reaction: \'<Reaction Reaction2 "degradation of TetR transcripts">\'',
   'Missing upper flux bound set to \'1000.0\' for reaction: \'<Reaction Reaction2 "degradation of TetR transcripts">\'',
   'Missing lower flux bound set to \'-1000.0\' for reaction: \'<Reaction Reaction3 "degradation of CI transcripts">\'',
   'Missi

In [15]:
comp.run(10)


Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`



In [16]:
results = comp.gather_results()

In [None]:
results

In [11]:
m = comp.state['fba']['instance']


for met in m.model.metabolites:
    print(met.compartment, met.name)

cell LacI protein
cell TetR protein
cell cI protein
cell LacI mRNA
cell TetR mRNA
cell cI mRNA


In [12]:
print(dir(m.model))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_annotation', '_compartments', '_contexts', '_id', '_populate_solver', '_repr_html_', '_sbml', '_set_id_with_model', '_solver', '_tolerance', 'add_boundary', 'add_cons_vars', 'add_groups', 'add_metabolites', 'add_reactions', 'annotation', 'boundary', 'compartments', 'constraints', 'copy', 'demands', 'exchanges', 'genes', 'get_associated_groups', 'groups', 'id', 'medium', 'merge', 'metabolites', 'name', 'notes', 'objective', 'objective_direction', 'optimize', 'problem', 'reactions', 'remove_cons_vars', 'remove_groups', 'remove_metabolites', 'remove_reactions', 'repair', 'sinks', 'slim_optimi

In [14]:
dir(m.model.objective)

['__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__idiv__',
 '__imul__',
 '__init__',
 '__init_subclass__',
 '__isub__',
 '__itruediv__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_canonicalize',
 '_direction',
 '_expression',
 '_expression_expired',
 '_get_expression',
 '_name',
 '_problem',
 '_substitute_variables',
 '_validate_optimization_expression_name',
 '_value',
 'clone',
 'direction',
 'expression',
 'from_json',
 'get_linear_coefficients',
 'is_Linear',
 'is_Quadratic',
 'name',
 'problem',
 'set_linear_coefficients',
 'to_json',
 'value',
 'variables']

In [18]:
m.model.objective.expression

0

In [20]:
from optlang.symbolics import Zero

model = m.model
# Clear the current objective
model.objective = Zero
reactions = model.reactions
weights = [1.0] * len(reactions)
objective_expression = Zero  # Start with zero
for reaction, weight in zip(reactions, weights):
    objective_expression += weight * reaction.flux_expression

model.objective = objective_expression

In [22]:
model.objective.direction

'max'