Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize working on Ensemble normalization #94

Merged
merged 3 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
837 changes: 429 additions & 408 deletions notebooks/getting_started.ipynb

Large diffs are not rendered by default.

105 changes: 74 additions & 31 deletions ompy/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import logging
import matplotlib.pyplot as plt
import os
from pathos.multiprocessing import ProcessPool
from pathos.helpers import cpu_count
from itertools import repeat

from typing import Callable, Union, List, Optional, Any, Tuple
from pathlib import Path
Expand All @@ -51,6 +54,12 @@
class Ensemble:
"""Generates perturbated matrices to estimate uncertainty

Note:
When adding any functionality that runs withing the parallelized loop
make sure to use the random generator exposed via the arguments. If
one uses np.random instead, this will be the same an exact copy for
each process.

Attributes:
raw (Matrix): The raw matrix used as model for the ensemble. If a
background is provided, this will initially be the the
Expand Down Expand Up @@ -83,6 +92,9 @@ class Ensemble:
generated unfolded matrix. Defaults to NOP.
action_firstgen (Action[Matrix]): An arbitrary action to apply to each
generated firstgen matrix. Defaults to NOP.
nprocesses (int): Number of processes for multiprocessing.
Defaults to number of available cpus-1 (with mimimum 1).
seed (int): Random seed for reproducibility of results


TODO:
Expand Down Expand Up @@ -131,6 +143,9 @@ def __init__(self, raw: Optional[Matrix] = None,
self.unfolded_ensemble: Optional[Matrix] = None
self.firstgen_ensemble: Optional[Matrix] = None

self.seed: int = 987654
self.nprocesses: int = cpu_count()-1 if cpu_count() > 1 else 1

if path is not None:
self.path = Path(path)
self.path.mkdir(exist_ok=True)
Expand Down Expand Up @@ -201,34 +216,26 @@ def generate(self, number: int, method: str = 'poisson',

self.size = number
self.regenerate = regenerate
raw_ensemble = np.zeros((number, *self.raw.shape))
unfolded_ensemble = np.zeros_like(raw_ensemble)
firstgen_ensemble = np.zeros_like(raw_ensemble)

for step in tqdm(range(number)):
LOG.info(f"Generating {step}")
if self.bg is not None:
prompt_w_bg = self.generate_perturbed(step, method,
state="prompt+bg")
bg = self.generate_perturbed(step, method, state="bg")
raw = self.subtract_bg(step, prompt_w_bg, bg)
else:
raw = self.generate_perturbed(step, method, state="raw")
unfolded = self.unfold(step, raw)
firstgen = self.first_generation(step, unfolded)

raw_ensemble[step, :, :] = raw.values
unfolded_ensemble[step, :, :] = unfolded.values
LOG.info(f"Start normalization with {self.nprocesses} cpus")
pool = ProcessPool(nodes=self.nprocesses)
ss = np.random.SeedSequence(self.seed)
iterator = pool.imap(self.step, range(number), ss.spawn(number),
repeat(method))
ensembles = np.array(list(tqdm(iterator, total=number)))
pool.close()
pool.join()
pool.clear()

# TODO The first generation method might reshape the matrix
if firstgen_ensemble.shape[1:] != firstgen.shape:
firstgen_ensemble = np.zeros((number, *firstgen.shape))
self.firstgen = firstgen
firstgen_ensemble[step, :, :] = firstgen.values
raw_ensemble = ensembles[:, 0, :, :]
unfolded_ensemble = ensembles[:, 1, :, :]
firstgen_ensemble = ensembles[:, 2, :, :]

# TODO Move this to a save step
self.raw.save(self.path / 'raw.npy')
self.firstgen.save(self.path / 'firstgen.npy')
# saving for firstgen is in step due to pickling
self.firstgen = Matrix(path=self.path / 'firstgen.npy')

# Calculate standard deviation
raw_ensemble_std = np.std(raw_ensemble, axis=0)
raw_std = Matrix(raw_ensemble_std, self.raw.Eg, self.raw.Ex,
Expand All @@ -253,8 +260,39 @@ def generate(self, number: int, method: str = 'poisson',
self.unfolded_ensemble = unfolded_ensemble
self.firstgen_ensemble = firstgen_ensemble

def step(self, step: int, rsequence: np.random.SeedSequence,
method: str):
"""Single step in `self.generate`

Args:
step (int): step (loop) number
rsequence (np.random.SeedSequence): SeedSequence for random seed
method (str): see `self.generate`

Returns:
raw, unfolded, firstgen
"""
LOG.info(f"Generating {step}")
rstate = np.random.default_rng(rsequence)
if self.bg is not None:
prompt_w_bg = self.generate_perturbed(step, method,
state="prompt+bg",
rstate=rstate)
bg = self.generate_perturbed(step, method, state="bg",
rstate=rstate)
raw = self.subtract_bg(step, prompt_w_bg, bg)
else:
raw = self.generate_perturbed(step, method, state="raw",
rstate=rstate)
unfolded = self.unfold(step, raw)
firstgen = self.first_generation(step, unfolded)

if step == 0: # workaround
firstgen.save(self.path / 'firstgen.npy')
return raw.values, unfolded.values, firstgen.values

def generate_perturbed(self, step: int, method: str, state: str) -> Matrix:
def generate_perturbed(self, step: int, method: str, state: str,
rstate: np.random.Generator) -> Matrix:
"""Generate a perturbated matrix

Looks for an already generated file before generating the matrix.
Expand All @@ -264,6 +302,7 @@ def generate_perturbed(self, step: int, method: str, state: str) -> Matrix:
method: The name of the method to use. Can be either
"gaussian" or "poisson"
state: Either "raw", "prompt+bg" or "bg".
rstate: numpy random generator (random state)

Returns:
The generated matrix
Expand All @@ -281,9 +320,9 @@ def generate_perturbed(self, step: int, method: str, state: str) -> Matrix:
# "remove negatives")
LOG.debug(f"(Re)generating {path} using {method} process")
if method == 'gaussian':
values = self.generate_gaussian(state)
values = self.generate_gaussian(state, rstate)
elif method == 'poisson':
values = self.generate_poisson(state)
values = self.generate_poisson(state, rstate)
else:
raise ValueError(f"Method {method} is not supported")
base_mat = self.matrix_from_state(state)
Expand Down Expand Up @@ -406,33 +445,37 @@ def rebin(self, out_array: np.ndarray, member: str) -> None:
self.firstgen_ensemble = rebinned
self.std_firstgen = firstgen_std

def generate_gaussian(self, state: str) -> np.ndarray:
def generate_gaussian(self, state: str,
rstate: Optional[np.random.Generator] = np.random.default_rng) -> np.ndarray: # noqa
"""Generates an array with Gaussian perturbations of a matrix.
Note that entries are truncated at 0 (only positive).

Args:
state: State of the matrx/which matrix should be taken as a base
rstate (optional): numpy random generator (random state)
Returns:
The resulting array
"""
mat = self.matrix_from_state(state)
std = np.sqrt(np.where(mat.values > 0, mat.values, 0))
perturbed = np.random.normal(size=mat.shape, loc=mat.values,
scale=std)
perturbed = rstate.normal(size=mat.shape, loc=mat.values,
scale=std)
perturbed[perturbed < 0] = 0
return perturbed

def generate_poisson(self, state: str) -> np.ndarray:
def generate_poisson(self, state: str,
rstate: Optional[np.random.Generator] = np.random.default_rng) -> np.ndarray: # noqa
"""Generates an array with Poisson perturbations of a matrix

Args:
state: State of the matrx/which matrix should be taken as a base
rstate (optional): numpy random generator (random state)
Returns:
The resulting array
"""
mat = self.matrix_from_state(state)
std = np.where(mat.values > 0, mat.values, 0)
perturbed = np.random.poisson(std)
perturbed = rstate.poisson(std)
return perturbed

def load_matrix(self, path: Union[Path, str]) -> Union[Matrix, None]:
Expand Down
59 changes: 44 additions & 15 deletions ompy/ensembleNormalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from scipy.stats import norm as scipynorm
import copy
from itertools import repeat
from pathos.multiprocessing import ProcessPool
from pathos.helpers import cpu_count

from .models import ResultsNormalized
from .vector import Vector
Expand Down Expand Up @@ -40,12 +42,22 @@ class EnsembleNormalizer:
normalizer_nld=...,
normalizer_gsf=...)

Note:
If one should add a functionality that depends on random numbers
withing the parallelized loop make sure to use the random generator
exposed via the arguments (see Ensemble class for an example). If one
uses np.random instead, this will be the same an exact copy for each
process. Note that this is not an issue for multinest seach routine,
which is anyhow seeded by default as implemented in ompy.

Attributes:
extractor (Extractor): Extractor instance
normalizer_nld (NormalizerNLD): NormalizerNLD instance
normalizer_gsf (NormalizerGSF): NormalizerGSF instance
normalizer_simultan (NormalizerSimultan): NormalizerSimultan instance
res (List[ResultsNormalized]): List of the results
nprocesses (int): Number of processes for multiprocessing.
Defaults to number of available cpus-1 (with mimimum 1).
"""

def __init__(self, *, extractor: Extractor,
Expand All @@ -67,6 +79,8 @@ def __init__(self, *, extractor: Extractor,

self.normalizer_simultan = normalizer_simultan

self.nprocesses: int = cpu_count()-1 if cpu_count() > 1 else 1

self.res: Optional[List[ResultsNormalized]] = None

def normalize(self) -> None:
Expand All @@ -80,24 +94,39 @@ def normalize(self) -> None:
gsfs = self.extractor.gsf
nlds = self.extractor.nld

# reset
self.res = []
LOG.info(f"Start normalization with {self.nprocesses} cpus")
pool = ProcessPool(nodes=self.nprocesses)
N = len(nlds)
iterator = pool.imap(self.step, range(N), nlds, gsfs)
self.res = list(tqdm(iterator, total=N))
pool.close()
pool.join()
pool.clear()

# tqdm should be innermost (see github))
for i, (nld, gsf) in enumerate(zip(tqdm(nlds), gsfs)):
LOG.info(f"\n\n---------\nNormalizing #{i}")
nld.copy()
nld.cut_nan()
def step(self, i: int, nld: Vector, gsf: Vector):
""" Normalization step for each ensemble member

gsf.copy()
gsf.cut_nan()
Args:
i (int): Loop number
nld (Vector): NLD before normalization
gsf (Vector): gsf before normalization

if self.normalizer_simultan is not None:
res = self.normalizeSimultan(i, nld=nld, gsf=gsf)
else:
res = self.normalizeStagewise(i, nld=nld, gsf=gsf)
Returns:
res (ResultsNormalized): results (/parameters) of normalization
"""
LOG.info(f"\n\n---------\nNormalizing #{i}")
nld.copy()
nld.cut_nan()

gsf.copy()
gsf.cut_nan()

if self.normalizer_simultan is not None:
res = self.normalizeSimultan(i, nld=nld, gsf=gsf)
else:
res = self.normalizeStagewise(i, nld=nld, gsf=gsf)

self.res.append(res)
return res

def normalizeSimultan(self, num: int, *,
nld: Vector, gsf: Vector) -> ResultsNormalized:
Expand Down Expand Up @@ -128,7 +157,7 @@ def normalizeStagewise(self, num: int, *,
"""
self.normalizer_nld.normalize(nld=nld, num=num)
self.normalizer_gsf.normalize(normalizer_nld=self.normalizer_nld,
gsf=gsf)
gsf=gsf, num=num)

# sample B from the gaussian uncertainty for each nld
B = self.normalizer_gsf.res.pars["B"]
Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
cython
numpy
numpy>=1.17
pandas
matplotlib
termtables
pymultinest
scipy
uncertainties
uncertainties>=3.0.3
tqdm
pathos