Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39661: Rename ButlerQuantumContext to QuantumContext #344

Merged
merged 13 commits into from
Jun 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def __init__(self, config: pexConfig.Config, initInput: Mapping, *args, **kwargs

def runQuantum(
self,
butlerQC: pipeBase.ButlerQuantumContext,
butlerQC: pipeBase.QuantumContext,
inputRefs: pipeBase.InputQuantizedConnection,
outputRefs: pipeBase.OutputQuantizedConnection,
):
Expand Down
3 changes: 3 additions & 0 deletions doc/changes/DM-39661.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* New class ``ExecutionResources`` has been created to record the number of cores and memory that has been allocated for the execution of a quantum.
* ``QuantumContext`` (newly renamed from ``ButlerQuantumContext``) now has a ``resources`` property that can be queried by a task in ``runQuantum``.
This can be used to tell the task that it can use multiple cores or possibly should make a more efficient use of the available memory resources.
2 changes: 2 additions & 0 deletions doc/changes/DM-39661.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
``ButlerQuantumContext`` has been renamed to ``QuantumContext``.
This reflects the additional functionality it now has.
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from ._dataset_handle import *
from ._instrument import *
from ._observation_dimension_packer import *
from ._quantumContext import *
from ._status import *
from ._task_metadata import *
from .butlerQuantumContext import *
from .config import *
from .connections import *
from .executionButlerBuilder import *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
"""Module defining a butler like object specialized to a specific quantum.
"""

__all__ = ("ButlerQuantumContext",)
__all__ = ("ButlerQuantumContext", "ExecutionResources", "QuantumContext")

from collections.abc import Sequence
import numbers
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from typing import Any

import astropy.units as u
from deprecated.sphinx import deprecated
from lsst.daf.butler import DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.utils.introspection import get_full_type_name
from lsst.utils.logging import PeriodicLogger, getLogger
Expand All @@ -39,8 +43,120 @@
_LOG = getLogger(__name__)


class ButlerQuantumContext:
"""A Butler-like class specialized for a single quantum.
@dataclass(init=False, frozen=True)
class ExecutionResources:
"""A description of the resources available to a running quantum.

Parameters
----------
num_cores : `int`, optional
The number of cores allocated to the task.
max_mem : `~astropy.units.Quantity`, `numbers.Real`, `str`, or `None`,\
optional
The amount of memory allocated to the task. Can be specified
as byte-compatible `~astropy.units.Quantity`, a plain number,
a string with a plain number, or a string representing a quantity.
If `None` no limit is specified.
default_mem_units : `astropy.units.Unit`, optional
The default unit to apply when the ``max_mem`` value is given
as a plain number.
"""

num_cores: int = 1
"""The maximum number of cores that the task can use."""

max_mem: u.Quantity | None = None
"""If defined, the amount of memory allocated to the task.
"""

def __init__(
self,
*,
num_cores: int = 1,
max_mem: u.Quantity | numbers.Real | str | None = None,
default_mem_units: u.Unit = u.B,
):
# Create our own __init__ to allow more flexible input parameters
# but with a constrained dataclass definition.
if num_cores < 1:
raise ValueError("The number of cores must be a positive integer")

object.__setattr__(self, "num_cores", num_cores)

mem: u.Quantity | None = None

if max_mem is None or isinstance(max_mem, u.Quantity):
mem = max_mem
elif max_mem == "":
# Some command line tooling can treat no value as empty string.
pass
else:
parsed_mem = None
try:
parsed_mem = float(max_mem)
except ValueError:
pass
else:
mem = parsed_mem * default_mem_units

if mem is None:
mem = u.Quantity(max_mem)

if mem is not None:
# Force to bytes. This also checks that we can convert to bytes.
mem = mem.to(u.B)

object.__setattr__(self, "max_mem", mem)

def __deepcopy__(self, memo: Any) -> ExecutionResources:
"""Deep copy returns itself because the class is frozen."""
return self

def _reduce_kwargs(self) -> dict[str, Any]:
"""Return a dict of the keyword arguments that should be used
by `__reduce__`.

This is necessary because the dataclass is defined to be keyword
only and we wish the default pickling to only store a plain number
for the memory allocation and not a large Quantity.

Returns
-------
kwargs : `dict`
Keyword arguments to be used when pickling.
"""
kwargs: dict[str, Any] = {"num_cores": self.num_cores}
if self.max_mem is not None:
# .value is a numpy float. Cast it to a python int since we
# do not want fractional bytes. The constructor ensures that this
# uses units of byte so we do not have to convert.
kwargs["max_mem"] = int(self.max_mem.value)
return kwargs

@staticmethod
def _unpickle_via_factory(
cls: type[ExecutionResources], args: Sequence[Any], kwargs: dict[str, Any]
) -> ExecutionResources:
"""Unpickle something by calling a factory.

Allows unpickle using `__reduce__` with keyword
arguments as well as positional arguments.
"""
return cls(**kwargs)

def __reduce__(
self,
) -> tuple[
Callable[[type[ExecutionResources], Sequence[Any], dict[str, Any]], ExecutionResources],
tuple[type[ExecutionResources], Sequence[Any], dict[str, Any]],
]:
"""Pickler."""
return self._unpickle_via_factory, (self.__class__, [], self._reduce_kwargs())


class QuantumContext:
"""A Butler-like class specialized for a single quantum along with
context information that can influence how the task is executed.

Parameters
----------
Expand All @@ -49,10 +165,12 @@ class ButlerQuantumContext:
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by a
single execution of this node in the pipeline graph.
resources : `ExecutionResources`, optional
The resources allocated for executing quanta.

Notes
-----
A ButlerQuantumContext class wraps a standard butler interface and
A `QuantumContext` class wraps a standard butler interface and
specializes it to the context of a given quantum. What this means
in practice is that the only gets and puts that this class allows
are DatasetRefs that are contained in the quantum.
Expand All @@ -63,8 +181,16 @@ class ButlerQuantumContext:
execution.
"""

def __init__(self, butler: LimitedButler, quantum: Quantum):
resources: ExecutionResources

def __init__(
self, butler: LimitedButler, quantum: Quantum, *, resources: ExecutionResources | None = None
):
self.quantum = quantum
if resources is None:
resources = ExecutionResources()
self.resources = resources

self.allInputs = set()
self.allOutputs = set()
for refs in quantum.inputs.values():
Expand Down Expand Up @@ -260,7 +386,7 @@ def _checkMembership(self, ref: list[DatasetRef] | DatasetRef, inout: set) -> No
"""Check if a `~lsst.daf.butler.DatasetRef` is part of the input
`~lsst.daf.butler.Quantum`.

This function will raise an exception if the `ButlerQuantumContext` is
This function will raise an exception if the `QuantumContext` is
used to get/put a `~lsst.daf.butler.DatasetRef` which is not defined
in the quantum.

Expand All @@ -287,3 +413,15 @@ def dimensions(self) -> DimensionUniverse:
repository (`~lsst.daf.butler.DimensionUniverse`).
"""
return self.__butler.dimensions


@deprecated(
reason="ButlerQuantumContext has been renamed to QuantumContext and been given extra functionality. "
"Please use the new name. Will be removed after v27.",
version="v26",
category=FutureWarning,
)
class ButlerQuantumContext(QuantumContext):
"""Deprecated version of `QuantumContext`."""

pass
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def applyConfigOverrides(
pipelineConfigs : `~collections.abc.Iterable` of `ConfigIR`
An iterable of `ConfigIR` objects that contain overrides
to apply to this config instance.
parameters : `ParametersIR`
parameters : `~.pipelineIR.ParametersIR`
Parameters defined in a Pipeline which are used in formatting
of config values across multiple `Task`\ s in a pipeline.
label : `str`
Expand Down
21 changes: 16 additions & 5 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ("ConfigIR", "ContractError", "ContractIR", "ImportIR", "PipelineIR", "TaskIR", "LabeledSubset")
__all__ = (
"ConfigIR",
"ContractError",
"ContractIR",
"ImportIR",
"LabeledSubset",
"ParametersIR",
"PipelineIR",
"TaskIR",
)

import copy
import enum
Expand Down Expand Up @@ -185,9 +194,11 @@ def to_primitives(self) -> dict[str, list[str] | str]:


@dataclass
class ParametersIR:
"""Intermediate representation of parameters that are global to a pipeline
class ParametersIR: # noqa: D405,D406,D407,D214 ("parameters" in code block)
"""Intermediate representation of parameters that are global to a pipeline.

Notes
-----
These parameters are specified under a top level key named ``parameters``
and are declared as a yaml mapping. These entries can then be used inside
task configuration blocks to specify configuration values. They may not be
Expand Down Expand Up @@ -279,12 +290,12 @@ def formatted(self, parameters: ParametersIR) -> ConfigIR:

Parameters
----------
parameters : ParametersIR
parameters : `ParametersIR`
Object that contains variable mappings used in substitution.

Returns
-------
config : ConfigIR
config : `ConfigIR`
A new ConfigIR object formatted with the input parameters
"""
new_config = copy.deepcopy(self)
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/pipe/base/pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from lsst.utils.logging import LsstLogAdapter

from .butlerQuantumContext import ButlerQuantumContext
from ._quantumContext import QuantumContext
from .config import PipelineTaskConfig
from .struct import Struct

Expand All @@ -59,7 +59,7 @@ class PipelineTask(Task):
sub-class) where all necessary I/O is performed, it reads all input data
from data butler into memory, calls `run()` method with that data, examines
returned `Struct` object and saves some or all of that data back to data
butler. `runQuantum()` method receives a `ButlerQuantumContext` instance to
butler. `runQuantum()` method receives a `QuantumContext` instance to
facilitate I/O, a `InputQuantizedConnection` instance which defines all
input `lsst.daf.butler.DatasetRef`, and a `OutputQuantizedConnection`
instance which defines all the output `lsst.daf.butler.DatasetRef` for a
Expand Down Expand Up @@ -151,7 +151,7 @@ def run(self, input, calib):

def runQuantum(
self,
butlerQC: ButlerQuantumContext,
butlerQC: QuantumContext,
inputRefs: InputQuantizedConnection,
outputRefs: OutputQuantizedConnection,
) -> None:
Expand All @@ -160,7 +160,7 @@ def runQuantum(

Parameters
----------
butlerQC : `ButlerQuantumContext`
butlerQC : `QuantumContext`
A butler which is specialized to operate in the context of a
`lsst.daf.butler.Quantum`.
inputRefs : `InputQuantizedConnection`
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/pipe/base/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
from lsst.pipe.base.connectionTypes import BaseConnection, DimensionedConnection

from .butlerQuantumContext import ButlerQuantumContext
from ._quantumContext import QuantumContext

if TYPE_CHECKING:
from .config import PipelineTaskConfig
Expand Down Expand Up @@ -302,14 +302,14 @@ def runTestQuantum(
If ``mockRun`` is set, the mock that replaced ``run``. This object can
be queried for the arguments ``runQuantum`` passed to ``run``.
"""
butlerQc = ButlerQuantumContext(butler, quantum)
butlerQc = QuantumContext(butler, quantum)
# This is a type ignore, because `connections` is a dynamic class, but
# it for sure will have this property
connections = task.config.ConnectionsClass(config=task.config) # type: ignore
inputRefs, outputRefs = connections.buildDatasetRefs(quantum)
if mockRun:
with unittest.mock.patch.object(task, "run") as mock, unittest.mock.patch(
"lsst.pipe.base.ButlerQuantumContext.put"
"lsst.pipe.base.QuantumContext.put"
):
task.runQuantum(butlerQc, inputRefs, outputRefs)
return mock
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/pipe/base/tests/mocks/_pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
_LOG = logging.getLogger(__name__)

if TYPE_CHECKING:
from ...butlerQuantumContext import ButlerQuantumContext
from ..._quantumContext import QuantumContext


def mock_task_defs(
Expand Down Expand Up @@ -238,7 +238,7 @@ def __init__(

def runQuantum(
self,
butlerQC: ButlerQuantumContext,
butlerQC: QuantumContext,
inputRefs: InputQuantizedConnection,
outputRefs: OutputQuantizedConnection,
) -> None:
Expand Down