Skip to content

Commit

Permalink
Enable pickling of normalizer instances via dill
Browse files Browse the repository at this point in the history
Created an abstract class to contain save and load for all normalization
classes / instances.

For `Extractor`, the `__getstate__` method is modified such that it
excludes the `ensemble` attribute to save space.
  • Loading branch information
fzeiser authored and fzeiser committed Mar 20, 2020
1 parent 09b7f90 commit 896b352
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 123 deletions.
65 changes: 65 additions & 0 deletions ompy/abstract_normalizer.py
@@ -0,0 +1,65 @@
import logging
from pathlib import Path
from typing import Optional, Union
import dill


class AbstractNormalizer():
""" Abstract class for Matrix and Vector.
Do not initialize itself.
Attributes:
path (Optional[Union[str, Path]]): Path to save/load
regenerate (bool): If `True`, calculates again instead of
loading from `path`.
_save_instance (bool): Can oversteer saving
"""
LOG = logging.getLogger(__name__)

def __init__(self, regenerate: bool = True):
self.regenerate = regenerate
self._save_instance: bool = True

def save(self, path: Optional[Union[str, Path]] = None,
overwrite: bool = True):
""" Save (pickels) the instance
Such that it can be loaded, and enabling the `regenerate` later.
Args:
path: The path to the save directory. If the
value is None, 'self.path' will be used.
overwrite: Overwrite file if existent
"""
if not self._save_instance:
return
path = Path(path) if path is not None else Path(self.path)
mode = 'wb' if overwrite else 'xb'
fname = (path / type(self).__name__).with_suffix('.pkl')
self.LOG.info(f"Saving to {fname}")
with open(fname, mode) as fobj:
dill.dump(self, fobj)

def load(self, path: Optional[Union[str, Path]] = None):
""" Loads (pickeled) instance.
Such that it can be loaded if `regenerate = False`.
Note that if any modifications of the __getstate__ method are present,
these will effect what attributes are pickeled.
Args:
path: The path to the directoryto load file. If the
value is None, 'self.path' will be used.
Raises:
FileNotFoundError: If file is not found
"""
path = Path(path) if path is not None else Path(self.path)
fname = (path / type(self).__name__).with_suffix('.pkl')

self.LOG.info(f"Try loading from {fname}")
with open(fname, "rb") as fobj:
saved = dill.load(fobj)
self.__dict__.update(saved.__dict__)
self.LOG.info(f"Loaded")
44 changes: 33 additions & 11 deletions ompy/ensembleNormalizer.py
Expand Up @@ -11,24 +11,24 @@
from itertools import repeat
from pathos.multiprocessing import ProcessPool
from pathos.helpers import cpu_count
from pathlib import Path

from .abstract_normalizer import AbstractNormalizer
from .models import ResultsNormalized
from .vector import Vector
from .extractor import Extractor
from .normalizer_nld import NormalizerNLD
from .normalizer_gsf import NormalizerGSF
from .normalizer_simultan import NormalizerSimultan


if 'JPY_PARENT_PID' in os.environ:
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm

LOG = logging.getLogger(__name__)
logging.captureWarnings(True)


class EnsembleNormalizer:
class EnsembleNormalizer(AbstractNormalizer):
"""Normalizes NLD nad γSF extracted from the ensemble
Usage:
Expand Down Expand Up @@ -60,11 +60,15 @@ class EnsembleNormalizer:
nprocesses (int): Number of processes for multiprocessing.
Defaults to number of available cpus-1 (with mimimum 1).
"""
LOG = logging.getLogger(__name__) # overwrite parent variable
logging.captureWarnings(True)

def __init__(self, *, extractor: Extractor,
normalizer_nld: Optional[NormalizerNLD] = None,
normalizer_gsf: Optional[NormalizerGSF] = None,
normalizer_simultan: Optional[NormalizerSimultan] = None):
normalizer_simultan: Optional[NormalizerSimultan] = None,
path: Optional[Union[str, Path]] = None,
regenerate: bool = False):
"""
Args:
extractor (Extractor): Extractor instance
Expand All @@ -73,19 +77,30 @@ def __init__(self, *, extractor: Extractor,
normalizer_simultan (NormalizerSimultan, optional):
NormalizerSimultan instance
"""
super().__init__(regenerate)
self.extractor = extractor

self.normalizer_nld = normalizer_nld
self.normalizer_gsf = normalizer_gsf
self.normalizer_nld = copy.deepcopy(normalizer_nld)
self.normalizer_gsf = copy.deepcopy(normalizer_gsf)

self.normalizer_simultan = normalizer_simultan
self.normalizer_simultan = copy.deepcopy(normalizer_simultan)

self.nprocesses: int = cpu_count()-1 if cpu_count() > 1 else 1

self.res: Optional[List[ResultsNormalized]] = None

self.path = Path(path) if path is not None else Path('normalizers')
self.path.mkdir(exist_ok=True)

def normalize(self) -> None:
""" Normalize ensemble """
if not self.regenerate:
try:
self.load()
return
except FileNotFoundError:
pass

assert xor((self.normalizer_nld is not None
and self.normalizer_gsf is not None),
self.normalizer_simultan is not None), \
Expand All @@ -95,7 +110,7 @@ def normalize(self) -> None:
gsfs = self.extractor.gsf
nlds = self.extractor.nld

LOG.info(f"Start normalization with {self.nprocesses} cpus")
self.LOG.info(f"Start normalization with {self.nprocesses} cpus")
pool = ProcessPool(nodes=self.nprocesses)
N = len(nlds)
iterator = pool.imap(self.step, range(N), nlds, gsfs)
Expand All @@ -104,6 +119,8 @@ def normalize(self) -> None:
pool.join()
pool.clear()

self.save()

def step(self, i: int, nld: Vector, gsf: Vector):
""" Normalization step for each ensemble member
Expand All @@ -115,7 +132,7 @@ def step(self, i: int, nld: Vector, gsf: Vector):
Returns:
res (ResultsNormalized): results (/parameters) of normalization
"""
LOG.info(f"\n\n---------\nNormalizing #{i}")
self.LOG.info(f"\n\n---------\nNormalizing #{i}")
nld.copy()
nld.cut_nan()

Expand All @@ -141,6 +158,7 @@ def normalizeSimultan(self, num: int, *,
Returns:
res (ResultsNormalized): results (/parameters) of normalization
"""
self.normalizer_simultan._save_instance = False
self.normalizer_simultan.normalize(gsf=gsf, nld=nld, num=num)
return self.normalizer_simultan.res

Expand All @@ -156,6 +174,10 @@ def normalizeStagewise(self, num: int, *,
Returns:
res (ResultsNormalized): results (/parameters) of normalization
"""
for norm in [self.normalizer_nld, self.normalizer_gsf]:
norm._save_instance = False
norm.regenerate = True

self.normalizer_nld.normalize(nld=nld, num=num)
self.normalizer_gsf.normalize(normalizer_nld=self.normalizer_nld,
gsf=gsf, num=num)
Expand All @@ -173,7 +195,7 @@ def plot(self, ax: Tuple[Any, Any] = None,
n_plot: bool = 5,
plot_model_stats: bool = False,
random_state: Optional[np.random.RandomState] = None,
return_stats: bool =False,
return_stats: bool = False,
**kwargs) -> Union[Tuple[Any, Any],
Tuple[Any, Any, Tuple[Any, Any]]]:
"""Plots randomly drawn samples
Expand Down
8 changes: 7 additions & 1 deletion ompy/extractor.py
Expand Up @@ -39,7 +39,7 @@ class Extractor:
Attributes:
ensemble (Ensemble): The Ensemble instance to extract nld and gsf from.
regenerate (bool): Whether to force extraction from matrices even if
previous results are found on disk. Defaults to True
previous results are found on disk. Defaults to False
method (str): The scipy.minimization method to use. Defaults to Powell.
options (dict): The scipy.minimization options to use.
nld (list[Vector]): The nuclear level densities extracted.
Expand Down Expand Up @@ -578,6 +578,12 @@ def ensemble_gsf(self) -> Vector:
std = self.gsf_std()
return Vector(values=values, E=energy, std=std)

def __getstate__(self):
""" `__getstate__` excluding `ensemble` attribute to save space """
state = self.__dict__.copy()
del state['ensemble']
return state


def normalize(mat: Matrix,
std: Optional[Matrix]) -> Tuple[np.ndarray, np.ndarray]:
Expand Down
44 changes: 32 additions & 12 deletions ompy/normalizer_gsf.py
Expand Up @@ -3,19 +3,19 @@
import copy
from numpy import ndarray
from typing import Optional, Tuple, Any, Callable, Union
from pathlib import Path
import matplotlib.pyplot as plt

from .abstract_normalizer import AbstractNormalizer
from .library import log_interp1d, self_if_none
from .models import ResultsNormalized, ExtrapolationModelLow,\
ExtrapolationModelHigh, NormalizationParameters
from .normalizer_nld import NormalizerNLD
from .spinfunctions import SpinFunctions
from .vector import Vector

LOG = logging.getLogger(__name__)


class NormalizerGSF():
class NormalizerGSF(AbstractNormalizer):

"""Normalize γSF to a given` <Γγ> (Gg)
Expand All @@ -37,15 +37,20 @@ class NormalizerGSF():
norm_pars (NormalizationParameters): Normalization parameters like
experimental <Γγ>
method_Gg (str): Method to use for the <Γγ> integral
path (Path): The path save the results.
res (ResultsNormalized): Results
"""
LOG = logging.getLogger(__name__) # overwrite parent variable
logging.captureWarnings(True)

def __init__(self, *, normalizer_nld: Optional[NormalizerNLD] = None,
nld: Optional[Vector] = None,
nld_model: Optional[Callable[..., Any]] = None,
alpha: Optional[float] = None,
gsf: Optional[Vector] = None,
path: Optional[Union[str, Path]] = None,
regenerate: bool = False,
norm_pars: Optional[NormalizationParameters] = None,
) -> None:
"""
Expand Down Expand Up @@ -78,6 +83,8 @@ def __init__(self, *, normalizer_nld: Optional[NormalizerNLD] = None,
Normalization parameters like experimental <Γγ>
"""
super().__init__(regenerate)

# use self._gsf internally, but separate from self._gsf
# in order to not trasform self.gsf_in 2x, if normalize() is called 2x
self.gsf_in = None if gsf is None else gsf.copy()
Expand Down Expand Up @@ -108,6 +115,9 @@ def __init__(self, *, normalizer_nld: Optional[NormalizerNLD] = None,
self._saved_spincutPars = None
self._saved_SpinSum_args = None

self.path = Path(path) if path is not None else Path('normalizers')
self.path.mkdir(exist_ok=True)

def normalize(self, *, gsf: Optional[Vector] = None,
normalizer_nld: Optional[NormalizerNLD] = None,
alpha: Optional[float] = None,
Expand All @@ -131,6 +141,14 @@ def normalize(self, *, gsf: Optional[Vector] = None,
Normalization parameters like experimental <Γγ>
num (Optional[int], optional): Loop number, defaults to 0.
"""
if not self.regenerate:
try:
print("regenerate is self.regenerate")
self.load()
return
except FileNotFoundError:
pass

# Update internal state
if gsf is None:
gsf = self.gsf_in
Expand All @@ -139,21 +157,21 @@ def normalize(self, *, gsf: Optional[Vector] = None,
normalizer_nld = self.self_if_none(normalizer_nld, nonable=True)

if nld is None:
LOG.debug("Setting nld from from normalizer_nld")
self.LOG.debug("Setting nld from from normalizer_nld")
self.nld = normalizer_nld.res.nld
else:
self.nld = self.self_if_none(nld)

alpha = self.self_if_none(alpha, nonable=True)
if alpha is None:
LOG.debug("Setting alpha from from normalizer_nld")
self.LOG.debug("Setting alpha from from normalizer_nld")
alpha = normalizer_nld.res.pars["alpha"][0]
assert alpha is not None, \
"Provide alpha or normalizer_nld with alpha"

self.nld_model = self.self_if_none(nld_model, nonable=True)
if nld_model is None:
LOG.debug("Setting nld_model from from normalizer_nld")
self.LOG.debug("Setting nld_model from from normalizer_nld")
self.nld_model = normalizer_nld.res.nld_model
assert alpha is not None, \
"Provide nld_model or normalizer_nld with nld_model"
Expand All @@ -167,7 +185,7 @@ def normalize(self, *, gsf: Optional[Vector] = None,
else:
self.res = ResultsNormalized(name="Results NLD and GSF, stepwise")

LOG.info(f"Normalizing #{num}")
self.LOG.info(f"Normalizing #{num}")
self._gsf = gsf.copy() # make a copy as it will be transformed
gsf.to_MeV()
gsf = gsf.transform(alpha=alpha, inplace=False)
Expand All @@ -187,7 +205,7 @@ def normalize(self, *, gsf: Optional[Vector] = None,
self.model_low.norm_to_shift_after(B_norm)
self.model_high.norm_to_shift_after(B_norm)

# save results
# transfer results
self.res.gsf = self._gsf
self.res.gsf_model_low = copy.deepcopy(self.model_low)
self.res.gsf_model_high = copy.deepcopy(self.model_high)
Expand All @@ -196,6 +214,8 @@ def normalize(self, *, gsf: Optional[Vector] = None,
else:
self.res.pars = {"B": [B_norm, B_norm_unc]}

self.save() # saves instance

def extrapolate(self,
gsf: Optional[Vector] = None,
E: Optional[np.ndarray] = [None, None]) -> Tuple[Vector, Vector]:
Expand All @@ -215,14 +235,14 @@ def extrapolate(self,
gsf = self._gsf

if self.model_low.method == "fit":
LOG.debug("Fitting extrapolation parameters")
self.LOG.debug("Fitting extrapolation parameters")
self.model_low.fit(gsf)
self.model_high.fit(gsf)

LOG.debug("Extrapolating low: %s", self.model_low)
self.LOG.debug("Extrapolating low: %s", self.model_low)
extrapolated_low = self.model_low.extrapolate(scaled=False, E=E[0])

LOG.debug("Extrapolating high: %s", self.model_high)
self.LOG.debug("Extrapolating high: %s", self.model_high)
extrapolated_high = self.model_high.extrapolate(scaled=False, E=E[1])
return extrapolated_low, extrapolated_high

Expand All @@ -232,7 +252,7 @@ def Gg_before_norm(self) -> float:
Returns:
Gg (float): <Γγ> before normalization, in meV
"""
LOG.debug("Method to compute Gg: %s", self.method_Gg)
self.LOG.debug("Method to compute Gg: %s", self.method_Gg)
if self.method_Gg == "standard":
return self.Gg_standard()
else:
Expand Down

0 comments on commit 896b352

Please sign in to comment.