Skip to content

Commit

Permalink
Stabilize sampling (#672)
Browse files Browse the repository at this point in the history
* Improve convergence and stability of sampling.

Fixes several numerical issues with sampling. There are still
limitations though, mostly since it is hard to have a numerically
stable online calculation of the mean. Also finally fixes the
RecursionError possibility. Sampling now throws a real and informative
error if it can not get out of a small sampling region.
  • Loading branch information
cdiener authored and Midnighter committed Mar 3, 2018
1 parent 818475f commit 290535a
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 204 deletions.
211 changes: 150 additions & 61 deletions cobra/flux_analysis/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
LOGGER = getLogger(__name__)
"""The logger for the package."""

bounds_tol = np.finfo(np.float32).eps
bounds_tol = 1e-6
"""The tolerance used for checking bounds feasibility."""

feasibility_tol = bounds_tol
feasibility_tol = 1e-6
"""The tolerance used for checking equalities feasibility."""

nproj = 1000000
"""Reproject the solution into the feasibility space every nproj iterations."""

nproj_center = 10000
"""Reproject the center into the nullspace every nproj_center iterations.
Only used for inhomogeneous problems."""
max_tries = 100
"""Maximum number of retries for sampling."""

Problem = namedtuple("Problem",
["equalities", "b", "inequalities", "bounds",
Expand Down Expand Up @@ -106,7 +102,7 @@ def shared_np_array(shape, data=None, integer=False):


# Has to be declared outside of class to be used for multiprocessing :(
def _step(sampler, x, delta, fraction=None):
def _step(sampler, x, delta, fraction=None, tries=0):
"""Sample a new feasible point from the point `x` in direction `delta`."""
prob = sampler.problem
valid = ((np.abs(delta) > feasibility_tol) &
Expand All @@ -116,14 +112,19 @@ def _step(sampler, x, delta, fraction=None):
valphas = (valphas / delta[valid]).flatten()
if prob.bounds.shape[0] > 0:
# permissible alphas for staying in constraint bounds
ineqs = prob.inequalities.dot(delta)
valid = np.abs(ineqs) > feasibility_tol
balphas = ((1.0 - bounds_tol) * prob.bounds -
prob.inequalities.dot(x))
balphas = (balphas / prob.inequalities.dot(delta)).flatten()
prob.inequalities.dot(x))[:, valid]
balphas = (balphas / ineqs[valid]).flatten()
# combined alphas
alphas = np.hstack([valphas, balphas])
else:
alphas = valphas
alpha_range = (alphas[alphas > 0.0].min(), alphas[alphas <= 0.0].max())
pos_alphas = alphas[alphas > 0.0]
neg_alphas = alphas[alphas <= 0.0]
alpha_range = np.array([neg_alphas.max() if len(neg_alphas) > 0 else 0,
pos_alphas.min() if len(pos_alphas) > 0 else 0])

if fraction:
alpha = alpha_range[0] + fraction * (alpha_range[1] - alpha_range[0])
Expand All @@ -133,12 +134,21 @@ def _step(sampler, x, delta, fraction=None):

# Numerical instabilities may cause bounds invalidation
# reset sampler and sample from one of the original warmup directions
# if that occurs
if np.any(sampler._bounds_dist(p) < -bounds_tol):
# if that occurs. Also reset if we got stuck.
if (np.any(sampler._bounds_dist(p) < -bounds_tol) or
np.abs(np.abs(alpha_range).max() * delta).max() < bounds_tol):
if tries > max_tries:
raise RuntimeError("Can not escape sampling region, model seems"
" numerically unstable :( Reporting the "
"model to "
"https://github.com/opencobra/cobrapy/issues "
"will help us to fix this :)")
LOGGER.info("found bounds infeasibility in sample, "
"resetting to center")
newdir = sampler.warmup[np.random.randint(sampler.n_warmup)]
return _step(sampler, sampler.center, newdir - sampler.center)
sampler.retries += 1
return _step(sampler, sampler.center, newdir - sampler.center, None,
tries + 1)
return p


Expand All @@ -152,6 +162,13 @@ class HRSampler(object):
thinning : int
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
The random number seed that should be used.
Attributes
----------
Expand All @@ -162,13 +179,18 @@ class HRSampler(object):
n_samples : int
The total number of samples that have been generated by this
sampler instance.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
warmup : a numpy matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
nproj : int
How often to reproject the sampling point into the feasibility space.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Expand All @@ -180,7 +202,7 @@ class HRSampler(object):
the respective reverse variable.
"""

def __init__(self, model, thinning, seed=None):
def __init__(self, model, thinning, nproj=None, seed=None):
"""Initialize a new sampler object."""
# This currently has to be done to reset the solver basis which is
# required to get deterministic warmup point generation
Expand All @@ -189,7 +211,12 @@ def __init__(self, model, thinning, seed=None):
raise TypeError("sampling does not work with integer problems :(")
self.model = model.copy()
self.thinning = thinning
if nproj is None:
self.nproj = int(min(len(self.model.variables)**3, 1e6))
else:
self.nproj = nproj
self.n_samples = 0
self.retries = 0
self.problem = self.__build_problem()
# Set up a map from reaction -> forward/reverse variable
var_idx = {v: idx for idx, v in enumerate(model.variables)}
Expand Down Expand Up @@ -252,32 +279,54 @@ def generate_fva_warmup(self):
if necessary).
"""
self.n_warmup = 0
idx = np.hstack([self.fwd_idx, self.rev_idx])
self.warmup = np.zeros((len(idx), len(self.model.variables)))
reactions = self.model.reactions
self.warmup = np.zeros((2 * len(reactions), len(self.model.variables)))
self.model.objective = Zero
self.model.objective.direction = "max"
variables = self.model.variables
for i in idx:
# Omit fixed reactions
if self.problem.variable_fixed[i]:
LOGGER.info("skipping fixed variable %s" %
variables[i].name)
continue
self.model.objective.set_linear_coefficients({variables[i]: 1})
self.model.slim_optimize()
if not self.model.solver.status == OPTIMAL:
LOGGER.info("can not maximize variable %s, skipping it" %
variables[i].name)
continue
primals = self.model.solver.primal_values
sol = [primals[v.name] for v in self.model.variables]
self.warmup[self.n_warmup, ] = sol
for sense in ("min", "max"):
self.model.objective_direction = sense
for i, r in enumerate(reactions):
variables = (self.model.variables[self.fwd_idx[i]],
self.model.variables[self.rev_idx[i]])
# Omit fixed reactions if they are non-homogeneous
if r.upper_bound - r.lower_bound < bounds_tol:
LOGGER.info("skipping fixed reaction %s" % r.id)
continue
self.model.objective.set_linear_coefficients(
{variables[0]: 1, variables[1]: -1})
self.model.slim_optimize()
if not self.model.solver.status == OPTIMAL:
LOGGER.info("can not maximize reaction %s, skipping it" %
r.id)
continue
primals = self.model.solver.primal_values
sol = [primals[v.name] for v in self.model.variables]
self.warmup[self.n_warmup, ] = sol
self.n_warmup += 1
# Reset objective
self.model.objective.set_linear_coefficients(
{variables[0]: 0, variables[1]: 0})
# Shrink to measure
self.warmup = self.warmup[0:self.n_warmup, :]
# Remove redundant search directions
keep = np.logical_not(self._is_redundant(self.warmup))
self.warmup = self.warmup[keep, :]
self.n_warmup = self.warmup.shape[0]

# Catch some special cases
if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1:
raise ValueError("Your flux cone consists only of a single point!")
elif self.n_warmup == 2:
if not self.problem.homogeneous:
raise ValueError("Can not sample from an inhomogenous problem"
" with only 2 search directions :(")
LOGGER.info("All search directions on a line, adding another one.")
newdir = self.warmup.T.dot([0.25, 0.25])
self.warmup = np.vstack([self.warmup, newdir])
self.n_warmup += 1
# revert objective
self.model.objective.set_linear_coefficients({variables[i]: 0})

# Shrink warmup points to measure
self.warmup = shared_np_array((self.n_warmup, len(variables)),
self.warmup[0:self.n_warmup, ])
self.warmup = shared_np_array(
(self.n_warmup, len(self.model.variables)), self.warmup)

def _reproject(self, p):
"""Reproject a point into the feasibility region.
Expand Down Expand Up @@ -307,14 +356,28 @@ def _reproject(self, p):
new = nulls.dot(nulls.T.dot(p))
# Projections may violate bounds
# set to random point in space in that case
if any(self._bounds_dist(new) < -bounds_tol):
if any(new != p):
LOGGER.info("reprojection failed in sample"
" %d, using random point in space" % self.n_samples)
idx = np.random.randint(self.n_warmup,
size=min(2, int(np.sqrt(self.n_warmup))))
new = self.warmup[idx, :].mean(axis=0)
new = self._random_point()
return new

def _random_point(self):
"""Find an approximately random point in the flux cone."""
idx = np.random.randint(self.n_warmup,
size=min(2, np.ceil(np.sqrt(self.n_warmup))))
return self.warmup[idx, :].mean(axis=0)

def _is_redundant(self, matrix, cutoff=1.0 - feasibility_tol):
"""Identify rdeundant rows in a matrix that can be removed."""
# Avoid zero variances
extra_col = matrix[:, 0] + 1
# Avoid zero rows being correlated with constant rows
extra_col[matrix.sum(axis=1) == 0] = 2
corr = np.corrcoef(np.c_[matrix, extra_col])
corr = np.tril(corr, -1)
return (np.abs(corr) > cutoff).any(axis=1)

def _bounds_dist(self, p):
"""Get the lower and upper bound distances. Negative is bad."""
prob = self.problem
Expand Down Expand Up @@ -445,7 +508,12 @@ class ACHRSampler(HRSampler):
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
seed : positive integer, optional
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Expand All @@ -465,9 +533,14 @@ class ACHRSampler(HRSampler):
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
nproj : int
How often to reproject the sampling point into the feasibility space.
fwd_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
Expand Down Expand Up @@ -503,9 +576,10 @@ class ACHRSampler(HRSampler):
.. [2] https://github.com/opencobra/cobratoolbox
"""

def __init__(self, model, thinning=100, seed=None):
def __init__(self, model, thinning=100, nproj=None, seed=None):
"""Initialize a new ACHRSampler."""
super(ACHRSampler, self).__init__(model, thinning, seed=seed)
super(ACHRSampler, self).__init__(model, thinning, nproj=nproj,
seed=seed)
self.generate_fva_warmup()
self.prev = self.center = self.warmup.mean(axis=0)
np.random.seed(self._seed)
Expand All @@ -516,10 +590,11 @@ def __single_iteration(self):
delta = self.warmup[pi, ] - self.center
self.prev = _step(self, self.prev, delta)
if self.problem.homogeneous and (self.n_samples *
self.thinning % nproj == 0):
self.thinning % self.nproj == 0):
self.prev = self._reproject(self.prev)
self.center = (self.n_samples * self.center + self.prev) / (
self.n_samples + 1)
self.center = self._reproject(self.center)
self.center = ((self.n_samples * self.center) / (self.n_samples + 1) +
self.prev / (self.n_samples + 1))
self.n_samples += 1

def sample(self, n, fluxes=True):
Expand Down Expand Up @@ -584,15 +659,17 @@ def _sample_chain(args):
delta = sampler.warmup[pi, ] - center

prev = _step(sampler, prev, delta)
if sampler.problem.homogeneous and (n_samples *
sampler.thinning % nproj == 0):
if sampler.problem.homogeneous and (
n_samples * sampler.thinning % sampler.nproj == 0):
prev = sampler._reproject(prev)
center = sampler._reproject(center)
if i % sampler.thinning == 0:
samples[i//sampler.thinning - 1, ] = prev
center = (n_samples * center + prev) / (n_samples + 1)
n_samples += 1
center = ((n_samples * center) / (n_samples + 1) +
prev / (n_samples + 1))
n_samples += 1

return samples
return (sampler.retries, samples)


class OptGPSampler(HRSampler):
Expand All @@ -610,7 +687,12 @@ class OptGPSampler(HRSampler):
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
seed : positive integer, optional
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Expand All @@ -630,9 +712,14 @@ class OptGPSampler(HRSampler):
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
nproj : int
How often to reproject the sampling point into the feasibility space.
fwd_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
Expand Down Expand Up @@ -672,7 +759,8 @@ class OptGPSampler(HRSampler):
https://doi.org/10.1371/journal.pone.0086587
"""

def __init__(self, model, processes, thinning=100, seed=None):
def __init__(self, model, processes, thinning=100, nproj=None,
seed=None):
"""Initialize a new OptGPSampler."""
super(OptGPSampler, self).__init__(model, thinning, seed=seed)
self.generate_fva_warmup()
Expand Down Expand Up @@ -725,18 +813,19 @@ def sample(self, n, fluxes=True):
# No with statement or starmap here since Python 2.x
# does not support it :(
mp = Pool(self.processes, initializer=mp_init, initargs=(self,))
chains = mp.map(_sample_chain, args, chunksize=1)
results = mp.map(_sample_chain, args, chunksize=1)
mp.close()
mp.join()
chains = np.vstack(chains)
chains = np.vstack([r[1] for r in results])
self.retries += sum(r[0] for r in results)
else:
mp_init(self)
chains = _sample_chain((n, 0))
results = _sample_chain((n, 0))
chains = results[1]

# Update the global center
self.center = (self.n_samples * self.center +
n * np.atleast_2d(chains).mean(axis=0)) / (
self.n_samples + n)
np.atleast_2d(chains).sum(0)) / (self.n_samples + n)
self.n_samples += n

if fluxes:
Expand Down

0 comments on commit 290535a

Please sign in to comment.