Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: Personal applicataion of custom modifiers #2398

Draft
wants to merge 1 commit into
base: feat/addCustomModifierPython
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 70 additions & 149 deletions src/pyhf/experimental/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pyhf import events, get_backend
from pyhf.parameters import ParamViewer

import numpy as np

log = logging.getLogger(__name__)

__all__ = ["add_custom_modifier"]
Expand Down Expand Up @@ -35,81 +37,83 @@ class BaseBuilder:
...


def _allocate_new_param(
p: dict[str, Sequence[float]]
) -> dict[str, str | bool | int | Sequence[float]]:
return {
"paramset_type": "unconstrained",
"n_parameters": 1,
"is_shared": True,
"inits": p["inits"],
"bounds": p["bounds"],
"is_scalar": True,
"fixed": False,
}


def make_func(expression: str, deps: list[str]) -> Callable[[Sequence[float]], Any]:
def func(d: Sequence[float]) -> Any:
return ne.evaluate(expression, local_dict=dict(zip(deps, d)))

return func
def add(funcname, par_names, newparams, input_set=None, namespace=None):
namespace = namespace or {}

def make_func(
expression: str, namespace=namespace
) -> Callable[[Sequence[float]], Any]:
def func(deps: Sequence[float]) -> Any:
if expression in namespace:
parvals = dict(zip(par_names, deps))
return namespace[expression](parvals)()
return ne.evaluate(
expression, local_dict=dict(zip(par_names, deps), **namespace)
)

def make_builder(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseBuilder:
class _builder(BaseBuilder):
is_shared = False
return func

def _allocate_new_param(p):
param_dict = {
'paramset_type': p['paramset_type']
if 'paramset_type' in p.keys()
else 'unconstrained',
'n_parameters': 1,
'is_shared': True,
'inits': p['inits'],
'bounds': p['bounds'],
'is_scalar': True,
'fixed': False,
'auxdata': p['auxdata'] if 'auxdata' in p.keys() else (0.0,),
}
return param_dict

class _builder:
is_shared = True

def __init__(self, config):
self.builder_data = {"funcs": {}}
self.builder_data = {'funcs': {}}
self.config = config
self.required_parsets = {}

def collect(self, thismod, nom):
maskval = bool(thismod)
maskval = True if thismod else False
mask = [maskval] * len(nom)
return {"mask": mask}
return {'mask': mask}

def append(self, key, channel, sample, thismod, defined_sample):
def append(self, key, channel, sample, thismod, defined_samp):
self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault(
"data", {"mask": []}
'data', {'mask': []}
)
nom = (
defined_sample["data"]
if defined_sample
defined_samp['data']
if defined_samp
else [0.0] * self.config.channel_nbins[channel]
)
mod_data = self.collect(thismod, nom)
self.builder_data[key][sample]["data"]["mask"] += mod_data["mask"]
moddata = self.collect(thismod, nom)
self.builder_data[key][sample]['data']['mask'] += moddata['mask']
if thismod:
if thismod["name"] != func_name:
self.builder_data["funcs"].setdefault(
thismod["name"], thismod["data"]["expr"]
if thismod['name'] != funcname:
self.builder_data['funcs'].setdefault(
thismod['name'], thismod['data']['expr']
)
self.required_parsets = {
k: [_allocate_new_param(v)] for k, v in new_params.items()
k: [_allocate_new_param(v)] for k, v in newparams.items()
}

def finalize(self):
return self.builder_data

return _builder


def make_applier(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseApplier:
class _applier(BaseApplier):
name = func_name
op_code = "multiplication"
class _applier:
name = funcname
op_code = 'addition'

def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):
self.funcs = [make_func(v, deps) for v in builder_data["funcs"].values()]
self.funcs = [make_func(f) for f in builder_data['funcs'].values()]

self.batch_size = batch_size
pars_for_applier = deps
_mod_names = [f"{mtype}/{m}" for m, mtype in modifiers]
pars_for_applier = par_names
_modnames = [f'{mtype}/{m}' for m, mtype in modifiers]

parfield_shape = (
(self.batch_size, pdfconfig.npars)
Expand All @@ -119,25 +123,25 @@ def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):
self.param_viewer = ParamViewer(
parfield_shape, pdfconfig.par_map, pars_for_applier
)
self._custom_mod_mask = [
[[builder_data[mod_name][s]["data"]["mask"]] for s in pdfconfig.samples]
for mod_name in _mod_names
self._custommod_mask = [
[[builder_data[modname][s]['data']['mask']] for s in pdfconfig.samples]
for modname in _modnames
]
self._precompute()
events.subscribe("tensorlib_changed")(self._precompute)
events.subscribe('tensorlib_changed')(self._precompute)

def _precompute(self):
tensorlib, _ = get_backend()
if not self.param_viewer.index_selection:
return
self.custom_mod_mask = tensorlib.tile(
tensorlib.astensor(self._custom_mod_mask),
self.custommod_mask = tensorlib.tile(
tensorlib.astensor(self._custommod_mask),
(1, 1, self.batch_size or 1, 1),
)
self.custom_mod_mask_bool = tensorlib.astensor(
self.custom_mod_mask, dtype="bool"
self.custommod_mask_bool = tensorlib.astensor(
self.custommod_mask, dtype="bool"
)
self.custom_mod_default = tensorlib.ones(self.custom_mod_mask.shape)
self.custommod_default = tensorlib.zeros(self.custommod_mask.shape)

def apply(self, pars):
"""
Expand All @@ -147,100 +151,17 @@ def apply(self, pars):
if not self.param_viewer.index_selection:
return
tensorlib, _ = get_backend()
if self.batch_size is None:
deps = self.param_viewer.get(pars)
results = tensorlib.astensor([f(deps) for f in self.funcs])
results = tensorlib.einsum(
"msab,m->msab", self.custom_mod_mask, results
)
else:
deps = self.param_viewer.get(pars)
results = tensorlib.astensor([f(deps) for f in self.funcs])
results = tensorlib.einsum(
"msab,ma->msab", self.custom_mod_mask, results
)
deps = self.param_viewer.get(pars)
out = tensorlib.astensor([f(deps) for f in self.funcs])
results = np.zeros_like(self.custommod_mask)
np.place(results, self.custommod_mask, out)
results = tensorlib.where(
self.custom_mod_mask_bool, results, self.custom_mod_default
self.custommod_mask_bool, results, self.custommod_default
)
return results

return _applier


def add_custom_modifier(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> dict[str, tuple[BaseBuilder, BaseApplier]]:
r"""
Add a custom modifier type with the modifier data defined through a custom
numexpr string expression.

Example:

>>> import pyhf
>>> import pyhf.experimental.modifiers
>>> pyhf.set_backend("numpy")
>>> new_params = {
... "m1": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)},
... "m2": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)},
... }
>>> expanded_pyhf = pyhf.experimental.modifiers.add_custom_modifier(
... "custom", ["m1", "m2"], new_params
... )
>>> model = pyhf.Model(
... {
... "channels": [
... {
... "name": "singlechannel",
... "samples": [
... {
... "name": "signal",
... "data": [10, 20],
... "modifiers": [
... {
... "name": "f2",
... "type": "custom",
... "data": {"expr": "m1"},
... },
... ],
... },
... {
... "name": "background",
... "data": [100, 150],
... "modifiers": [
... {
... "name": "f1",
... "type": "custom",
... "data": {"expr": "m1+(m2**2)"},
... },
... ],
... },
... ],
... }
... ]
... },
... modifier_set=expanded_pyhf,
... poi_name="m1",
... validate=False,
... batch_size=1,
... )
>>> model.config.modifiers
[('f1', 'custom'), ('f2', 'custom')]

Args:
func_name (:obj:`str`): The name of the custom modifier type.
deps (:obj:`list`): The names of the new parameters of the modifier
function.
new_params (:obj:`dict`): The new parameters.

Returns:
:obj:`dict`: The updated ``pyhf.modifiers.histfactory_set`` with the added
custom modifier type.

.. versionadded:: 0.8.0
"""
_builder = make_builder(func_name, deps, new_params)
_applier = make_applier(func_name, deps, new_params)

modifier_set = {_applier.name: (_builder, _applier)}
modifier_set.update(**pyhf.modifiers.histfactory_set)
modifier_set.update(
**(input_set if input_set is not None else pyhf.modifiers.histfactory_set)
)
return modifier_set
Loading