Skip to content

Commit

Permalink
removed n_jobs keyword argument, pool now shared between likelihood c…
Browse files Browse the repository at this point in the history
…alls and sampler calculations by default
  • Loading branch information
johannesulf committed May 27, 2023
1 parent 1cba050 commit 7fc1ea4
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 53 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Early stopping for neural network training is now default.

### Deprecated
- The `n_jobs` keyword argument for the sampler has been deprecated. The pool used for likelihood calls is now also used for sampler parallelization, by default. To use independent pools for likelihood calls and sampler calculations, pass a tuple to `pool`.

## [0.6.0] - 2023-04-22

### Changed
Expand Down
71 changes: 39 additions & 32 deletions nautilus/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ class Sampler():
pass_dict : bool
If True, the likelihood function expects model parameters as
dictionaries.
pool : object
pool_l : object
Pool used to parallelize likelihood calls.
n_jobs : int or string
Number of parallel jobs to use for neural network training and sampling
new points.
pool_s : object
Pool used to parallelize sampler calculations.
rng : np.random.Generator
Random number generator of the sampler.
n_like : int
Expand Down Expand Up @@ -109,7 +108,7 @@ def __init__(self, prior, likelihood, n_dim=None, n_live=2000,
prior_kwargs=dict(), likelihood_args=[],
likelihood_kwargs=dict(), n_batch=100,
n_like_new_bound=None, vectorized=False, pass_dict=None,
pool=None, n_jobs=1, seed=None, blobs_dtype=None,
pool=None, n_jobs=None, seed=None, blobs_dtype=None,
filepath=None, resume=True):
r"""
Initialize the sampler.
Expand Down Expand Up @@ -182,18 +181,19 @@ def __init__(self, prior, likelihood, n_dim=None, n_live=2000,
dictionaries. If False, it expects regular numpy arrays. Default is
to set it to True if prior was a nautilus.Prior instance and False
otherwise.
pool : object or int, optional
Object with a `map` function used for parallelization of likelihood
calls, e.g. a multiprocessing.Pool object, or a positive integer.
If it is an integer, it determines the number of workers in the
Pool. Default is None.
pool : None, object, int or tuple, optional
Pool used for parallelization of likelihood calls and sampler
calculations. If None, no parallelization is performed. If an
integer, the sampler will use a multiprocessing.Pool object with
the specified number of processes. Finally, if specifying a tuple,
the first one specifies the pool used for likelihood calls and the
second one the pool for sampler calculations. Default is None.
n_jobs : int or string, optional
Number of parallel jobs to use for neural network training and
sampling new points. If the string 'max' is passed, all available
cores are used. Default is 'max'.
seed : int, optional
Deprecated.
seed : None or int, optional
Seed for random number generation used for reproducible results
accross different runs. Default is None.
accross different runs. If None, results are not reproducible.
Default is None.
blobs_dtype : object or None, optional
Object that can be converted to a data type object describing the
blobs. If None, this will be inferred from the first blob. Default
Expand Down Expand Up @@ -263,16 +263,27 @@ def __init__(self, prior, likelihood, n_dim=None, n_live=2000,
self.vectorized = vectorized
self.pass_dict = pass_dict

if isinstance(pool, int):
self.pool = Pool(pool)
elif pool is not None:
self.pool = pool
if pool is None:
self.pool_l = None
self.pool_s = None
elif isinstance(pool, int):
self.pool_l = Pool(pool)
self.pool_s = self.pool_l
elif isinstance(pool, tuple):
self.pool_l = pool[0]
if isinstance(self.pool_l, int):
self.pool_l = Pool(self.pool_l)
self.pool_s = pool[1]
if isinstance(self.pool_s, int):
self.pool_s = Pool(self.pool_s)
else:
self.pool = None
self.pool_l = pool
self.pool_s = pool

if n_jobs == 'max':
n_jobs = cpu_count()
self.n_jobs = n_jobs
if n_jobs is not None:
warnings.warn(
"The 'n_jobs' keyword argument has been deprecated .Use " +
"'pool', instead.", DeprecationWarning, stacklevel=2)

self.rng = np.random.default_rng(seed)

Expand Down Expand Up @@ -354,7 +365,6 @@ def run(self, f_live=0.01, n_shell=None, n_eff=10000,
If True, print additional information. Default is False.
"""
self._pool = Pool(self.n_jobs) if self.n_jobs > 1 else None

if not self.explored:

Expand Down Expand Up @@ -416,9 +426,6 @@ def run(self, f_live=0.01, n_shell=None, n_eff=10000,

self.add_points(n_shell=n_shell, n_eff=n_eff, verbose=verbose)

if self.n_jobs > 1:
self._pool.close()

def posterior(self, return_as_dict=None, equal_weight=False,
return_blobs=False):
"""Return the posterior sample estimate.
Expand Down Expand Up @@ -572,7 +579,7 @@ def sample_shell(self, index, shell_t=None):
with threadpool_limits(limits=1):
while n_sample < self.n_batch:
points = self.bounds[index].sample(
self.n_batch - n_sample, pool=self._pool)
self.n_batch - n_sample, pool=self.pool_s)
n_bound += self.n_batch - n_sample

# Remove points that are actually in another shell.
Expand Down Expand Up @@ -654,8 +661,8 @@ def evaluate_likelihood(self, points):
result = self.likelihood(args)
if isinstance(result, tuple):
result = list(zip(*result))
elif self.pool is not None:
result = list(self.pool.map(self.likelihood, args))
elif self.pool_l is not None:
result = list(self.pool_l.map(self.likelihood, args))
else:
result = list(map(self.likelihood, args))

Expand Down Expand Up @@ -754,8 +761,8 @@ def add_bound(self, verbose=False):
split_threshold=self.split_threshold,
n_networks=self.n_networks,
neural_network_kwargs=self.neural_network_kwargs,
pool=self._pool, rng=self.rng)
bound.sample(1000, return_points=False, pool=self._pool)
pool=self.pool_s, rng=self.rng)
bound.sample(1000, return_points=False, pool=self.pool_s)
if bound.volume() > self.bounds[-1].volume():
bound = self.bounds[-1]
self.bounds.append(bound)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def likelihood(x):
return 1, dtype(10 * x[0])

sampler = Sampler(prior, likelihood, n_dim=2, n_live=10,
vectorized=vectorized, n_jobs=1)
vectorized=vectorized)
sampler.run(f_live=1.0, n_eff=200, discard_exploration=discard_exploration)
points, log_w, log_l, blobs = sampler.posterior(return_blobs=True)

Expand All @@ -46,7 +46,7 @@ def likelihood(x):
return 1, np.float32(x[0]), np.float32(x[1])

sampler = Sampler(prior, likelihood, n_dim=2, n_live=10,
vectorized=vectorized, n_jobs=1)
vectorized=vectorized)
sampler.run(f_live=1.0, n_eff=200, discard_exploration=discard_exploration)
points, log_w, log_l, blobs = sampler.posterior(return_blobs=True)

Expand All @@ -70,7 +70,7 @@ def likelihood(x):

blobs_dtype = [('a', '|S10'), ('b', np.int16)]
sampler = Sampler(prior, likelihood, n_dim=2, n_live=10,
vectorized=vectorized, n_jobs=1, blobs_dtype=blobs_dtype)
vectorized=vectorized, blobs_dtype=blobs_dtype)
sampler.run(f_live=1.0, n_eff=200, discard_exploration=discard_exploration)
points, log_w, log_l, blobs = sampler.posterior(return_blobs=True)

Expand All @@ -94,7 +94,7 @@ def likelihood(x):
return 1, x[0], x[1]

sampler = Sampler(prior, likelihood, n_dim=2, n_live=10,
vectorized=vectorized, n_jobs=1, blobs_dtype=float)
vectorized=vectorized, blobs_dtype=float)
sampler.run(f_live=1.0, n_eff=200, discard_exploration=discard_exploration)
points, log_w, log_l, blobs = sampler.posterior(return_blobs=True)
assert len(points) == len(blobs)
Expand Down
11 changes: 5 additions & 6 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ def likelihood(x):
return -np.linalg.norm(x - 0.5) * 0.001

sampler_write = Sampler(prior, likelihood, n_dim=2, n_live=100,
n_networks=n_networks, n_jobs=1,
filepath='test.hdf5', resume=False, seed=0)
n_networks=n_networks, filepath='test.hdf5',
resume=False, seed=0)
sampler_write.run(f_live=0.45, n_eff=0, verbose=True)
sampler_write.explored = False
sampler_read = Sampler(prior, likelihood, n_dim=2, n_live=100,
n_networks=n_networks, n_jobs=1,
filepath='test.hdf5', resume=True)
n_networks=n_networks, filepath='test.hdf5',
resume=True)
sampler_read.explored = False

sampler_write.run(f_live=0.45, n_eff=1000,
Expand Down Expand Up @@ -143,8 +143,7 @@ def likelihood(x):
return -np.linalg.norm(x - 0.5) * 0.001

sampler = Sampler(prior, likelihood, n_dim=2, n_live=100,
n_networks=1, n_jobs=1, filepath='test.hdf5',
resume=False, seed=0)
n_networks=1, filepath='test.hdf5', resume=False, seed=0)
sampler.run(n_eff=1000, discard_exploration=True)

assert Path('test.hdf5').is_file()
Expand Down
16 changes: 11 additions & 5 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ def likelihood(x):

@pytest.mark.skipif(multiprocessing.get_start_method() == 'spawn',
reason=('pytest does not support spawning'))
@pytest.mark.parametrize("pool", [1, 2, Pool(2), Pool(3)])
@pytest.mark.parametrize("pool", [1, 2, Pool(2), Pool(3), None,
(2, 2), (None, 1), (2, Pool(2))])
def test_pool(pool):
# Test that the expected number of processes are run.

sampler = Sampler(prior, likelihood, n_dim=2, n_live=50, n_networks=1,
pool=pool, n_jobs=1)
pool=pool)
sampler.run(f_live=1.0, n_eff=0)
points, log_w, log_l, blobs = sampler.posterior(return_blobs=True)

if isinstance(pool, tuple):
pool = pool[0]

if isinstance(pool, int):
assert len(np.unique(blobs)) == pool
n_jobs = pool
elif pool is None:
n_jobs = 1
else:
assert len(np.unique(blobs)) == pool._processes
n_jobs = pool._processes

sampler.pool.close()
assert len(np.unique(blobs)) == n_jobs
11 changes: 5 additions & 6 deletions tests/test_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def likelihood(x):

sampler = Sampler(
prior, likelihood, n_dim=2, n_networks=n_networks,
vectorized=vectorized, pass_dict=pass_dict, n_live=500, n_jobs=1)
vectorized=vectorized, pass_dict=pass_dict, n_live=500)
sampler.run(f_live=0.45, n_eff=0, verbose=False)
points, log_w, log_l = sampler.posterior(return_as_dict=pass_dict)

Expand Down Expand Up @@ -58,7 +58,7 @@ def likelihood(x):

sampler = Sampler(
prior, likelihood, n_dim=2, n_networks=1, vectorized=vectorized,
pass_dict=pass_dict, n_live=500, n_jobs=1)
pass_dict=pass_dict, n_live=500)
sampler.run(f_live=0.45, n_eff=0, verbose=False)
points, log_w, log_l = sampler.posterior(return_as_dict=pass_dict)
if custom_prior and pass_dict:
Expand All @@ -82,7 +82,7 @@ def likelihood(x):
return multivariate_normal.logpdf(x, mean=mean, cov=cov)

sampler = Sampler(prior, likelihood, n_dim=n_dim, n_live=500,
n_networks=n_networks, n_jobs=1, seed=0)
n_networks=n_networks, seed=0)
sampler.run(discard_exploration=discard_exploration, f_live=0.1,
verbose=False)

Expand Down Expand Up @@ -117,7 +117,7 @@ def likelihood(x):
return -np.linalg.norm(x - 0.5)**2 * 0.001

sampler = Sampler(prior, likelihood, n_dim=2, enlarge_per_dim=100,
n_networks=0, n_jobs=1, seed=0)
n_networks=0, seed=0)
sampler.run(f_live=0.1, n_eff=0)

# The effective sample size should be very close to the number of calls
Expand Down Expand Up @@ -153,8 +153,7 @@ def likelihood(x):
x_1 = np.random.normal(loc=0.5, scale=np.exp(20 * (x_0 - 0.5)) / 100)
log_z_true = np.log(np.mean((x_0 > 0) & (x_0 < 1) & (x_1 > 0) & (x_1 < 1)))

sampler = Sampler(prior, likelihood, n_dim=2, n_networks=1, n_jobs=1,
seed=0)
sampler = Sampler(prior, likelihood, n_dim=2, n_networks=1, seed=0)
sampler.run()
assert np.isclose(log_z_true, sampler.evidence(), rtol=0, atol=0.1)
# Check whether the boundaries nautilus drew are strictly nested.
Expand Down

0 comments on commit 7fc1ea4

Please sign in to comment.