Permalink
Browse files

add parallel sampling to SMC (#3367)

* add parallel sampling to SMC

* fix test

* fix test, second attempt

* set parallel=False
  • Loading branch information...
aloctavodia committed Feb 9, 2019
1 parent 5e1bc75 commit 4f1a291c331576212a352deb8fc3c54c9f0b27cd
Showing with 305 additions and 251 deletions.
  1. +16 −13 pymc3/sampling.py
  2. +87 −36 pymc3/step_methods/smc.py
  3. +202 −202 pymc3/tests/test_step.py
@@ -247,9 +247,9 @@ def sample(draws=500, step=None, init='auto', n_init=200000, start=None, trace=N
number of draws.
cores : int
The number of chains to run in parallel. If `None`, set to the number of CPUs in the
system, but at most 4 (for 'SMC' defaults to 1). Keep in mind that some chains might
themselves be multithreaded via openmp or BLAS. In those cases it might be faster to set
this to 1.
system, but at most 4 (for 'SMC' ignored if `pm.SMC(parallel=False)`. Keep in mind that
some chains might themselves be multithreaded via openmp or BLAS. In those cases it might
be faster to set this to 1.
tune : int
Number of iterations to tune, defaults to 500. Ignored when using 'SMC'. Samplers adjust
the step sizes, scalings or similar during tuning. Tuning samples will be drawn in addition
@@ -319,25 +319,27 @@ def sample(draws=500, step=None, init='auto', n_init=200000, start=None, trace=N
nuts_kwargs = kwargs.pop('nuts_kwargs', None)
if nuts_kwargs is not None:
warnings.warn("The nuts_kwargs argument has been deprecated. Pass step "
"method arguments directly to sample instead",
DeprecationWarning)
"method arguments directly to sample instead",
DeprecationWarning)
kwargs.update(nuts_kwargs)
step_kwargs = kwargs.pop('step_kwargs', None)
if step_kwargs is not None:
warnings.warn("The step_kwargs argument has been deprecated. Pass step "
"method arguments directly to sample instead",
DeprecationWarning)
"method arguments directly to sample instead",
DeprecationWarning)
kwargs.update(step_kwargs)

if cores is None:
cores = min(4, _cpu_count())

if isinstance(step, pm.step_methods.smc.SMC):
trace = smc.sample_smc(draws=draws,
step=step,
cores=cores,
progressbar=progressbar,
model=model,
random_seed=random_seed)
else:
if cores is None:
cores = min(4, _cpu_count())
if 'njobs' in kwargs:
cores = kwargs['njobs']
warnings.warn(
@@ -423,12 +425,12 @@ def sample(draws=500, step=None, init='auto', n_init=200000, start=None, trace=N
'random_seed': random_seed,
'live_plot': live_plot,
'live_plot_kwargs': live_plot_kwargs,
'cores': cores,}
'cores': cores, }

sample_args.update(kwargs)

has_population_samplers = np.any([isinstance(m, arraystep.PopulationArrayStepShared)
for m in (step.methods if isinstance(step, CompoundStep) else [step])])
for m in (step.methods if isinstance(step, CompoundStep) else [step])])

parallel = cores > 1 and chains > 1 and not has_population_samplers
if parallel:
@@ -1127,7 +1129,7 @@ def sample_ppc(*args, **kwargs):


def sample_posterior_predictive_w(traces, samples=None, models=None, weights=None,
random_seed=None, progressbar=True):
random_seed=None, progressbar=True):
"""Generate weighted posterior predictive samples from a list of models and
a list of traces according to a set of weights.
@@ -1306,7 +1308,8 @@ def sample_prior_predictive(samples=500, model=None, vars=None, random_seed=None
elif is_transformed_name(var_name):
untransformed = get_untransformed_name(var_name)
if untransformed in data:
prior[var_name] = model[untransformed].transformation.forward_val(data[untransformed])
prior[var_name] = model[untransformed].transformation.forward_val(
data[untransformed])
return prior


@@ -5,6 +5,7 @@
import theano
import pymc3 as pm
from tqdm import tqdm
import multiprocessing as mp

from .arraystep import metrop_select
from .metropolis import MultivariateNormalProposal
@@ -43,6 +44,9 @@ class SMC:
Determines the change of beta from stage to stage, i.e.indirectly the number of stages,
the higher the value of `threshold` the higher the number of stages. Defaults to 0.5.
It should be between 0 and 1.
parallel : bool
Distribute computations across cores if the number of cores is larger than 1
(see pm.sample() for details). Defaults to True.
model : :class:`pymc3.Model`
Optional model for sampling step. Defaults to None (taken from context).
@@ -68,6 +72,7 @@ def __init__(
tune_scaling=True,
tune_steps=True,
threshold=0.5,
parallel=True,
):

self.n_steps = n_steps
@@ -77,9 +82,10 @@ def __init__(
self.tune_scaling = tune_scaling
self.tune_steps = tune_steps
self.threshold = threshold
self.parallel = parallel


def sample_smc(draws=5000, step=None, progressbar=False, model=None, random_seed=-1):
def sample_smc(draws=5000, step=None, cores=None, progressbar=False, model=None, random_seed=-1):
"""
Sequential Monte Carlo sampling
@@ -90,6 +96,8 @@ def sample_smc(draws=5000, step=None, progressbar=False, model=None, random_seed
independent Markov Chains. Defaults to 5000.
step : :class:`SMC`
SMC initialization object
cores : int
The number of chains to run in parallel.
progressbar : bool
Flag for displaying a progress bar
model : pymc3 Model
@@ -102,9 +110,10 @@ def sample_smc(draws=5000, step=None, progressbar=False, model=None, random_seed
if random_seed != -1:
np.random.seed(random_seed)

beta = 0.
beta = 0.0
stage = 0
acc_rate = 1.
accepted = 0
acc_rate = 1.0
proposed = draws * step.n_steps
model.marginal_likelihood = 1
variables = inputvars(model.vars)
@@ -138,53 +147,95 @@ def sample_smc(draws=5000, step=None, progressbar=False, model=None, random_seed
if step.tune_scaling:
step.scaling = _tune(acc_rate)
if step.tune_steps:
acc_rate = max(1. / proposed, acc_rate)
acc_rate = max(1.0 / proposed, acc_rate)
step.n_steps = min(
step.max_steps, 1 + int(np.log(step.p_acc_rate) / np.log(1 - acc_rate))
)

pm._log.info(
"Stage: {:d} Beta: {:f} Steps: {:d}".format(stage, beta, step.n_steps, acc_rate)
)
pm._log.info("Stage: {:d} Beta: {:.3f} Steps: {:d}".format(stage, beta, step.n_steps))
# Apply Metropolis kernel (mutation)
proposed = draws * step.n_steps
accepted = 0.
priors = np.array([prior_logp(sample) for sample in posterior]).squeeze()
tempered_post = priors + likelihoods * beta
for draw in tqdm(range(draws), disable=not progressbar):
old_tempered_post = tempered_post[draw]
q_old = posterior[draw]
deltas = np.squeeze(proposal(step.n_steps) * step.scaling)
for n_step in range(step.n_steps):
delta = deltas[n_step]

if any_discrete:
if all_discrete:
delta = np.round(delta, 0).astype("int64")
q_old = q_old.astype("int64")
q_new = (q_old + delta).astype("int64")
else:
delta[discrete] = np.round(delta[discrete], 0)
q_new = floatX(q_old + delta)
else:
q_new = floatX(q_old + delta)

new_tempered_post = prior_logp(q_new) + likelihood_logp(q_new)[0] * beta

q_old, accept = metrop_select(new_tempered_post - old_tempered_post, q_new, q_old)
if accept:
accepted += 1
posterior[draw] = q_old
old_tempered_post = new_tempered_post

acc_rate = accepted / proposed
tempered_logp = priors + likelihoods * beta
deltas = np.squeeze(proposal(step.n_steps) * step.scaling)

parameters = (
proposal,
step.scaling,
accepted,
any_discrete,
all_discrete,
discrete,
step.n_steps,
prior_logp,
likelihood_logp,
beta,
)

if step.parallel and cores > 1:
pool = mp.Pool(processes=cores)
results = pool.starmap(
_metrop_kernel,
[(posterior[draw], tempered_logp[draw], *parameters) for draw in range(draws)],
)
else:
results = [
_metrop_kernel(posterior[draw], tempered_logp[draw], *parameters)
for draw in tqdm(range(draws), disable=not progressbar)
]

posterior, acc_list = zip(*results)
posterior = np.array(posterior)
acc_rate = sum(acc_list) / proposed
stage += 1

trace = _posterior_to_trace(posterior, variables, model, var_info)

return trace


def _metrop_kernel(
q_old,
old_tempered_logp,
proposal,
scaling,
accepted,
any_discrete,
all_discrete,
discrete,
n_steps,
prior_logp,
likelihood_logp,
beta,
):
"""
Metropolis kernel
"""
deltas = np.squeeze(proposal(n_steps) * scaling)
for n_step in range(n_steps):
delta = deltas[n_step]

if any_discrete:
if all_discrete:
delta = np.round(delta, 0).astype("int64")
q_old = q_old.astype("int64")
q_new = (q_old + delta).astype("int64")
else:
delta[discrete] = np.round(delta[discrete], 0)
q_new = floatX(q_old + delta)
else:
q_new = floatX(q_old + delta)

new_tempered_logp = prior_logp(q_new) + likelihood_logp(q_new)[0] * beta

q_old, accept = metrop_select(new_tempered_logp - old_tempered_logp, q_new, q_old)
if accept:
accepted += 1
old_tempered_logp = new_tempered_logp

return q_old, accepted


def _initial_population(draws, model, variables):
"""
Create an initial population from the prior
Oops, something went wrong.

0 comments on commit 4f1a291

Please sign in to comment.