Skip to content

Commit

Permalink
Merge pull request #338 from lsst/tickets/DM-37704
Browse files Browse the repository at this point in the history
DM-37704: Update code to use resolved refs.
  • Loading branch information
andy-slac committed May 26, 2023
2 parents 807f6e7 + dfcc5a2 commit ccb5b7f
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 160 deletions.
2 changes: 2 additions & 0 deletions doc/changes/DM-37704.api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* `ButlerQuantumContext` is updated to only need a `LimitedButler`.
* Factory methods `from_full` and `from_limited` were dropped, a constructor accepting a `LimitedButler` instance is now used to make instances.
82 changes: 14 additions & 68 deletions python/lsst/pipe/base/butlerQuantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from typing import Any, List, Optional, Sequence, Union

from lsst.daf.butler import Butler, DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.daf.butler import DatasetRef, DimensionUniverse, LimitedButler, Quantum
from lsst.utils.introspection import get_full_type_name
from lsst.utils.logging import PeriodicLogger, getLogger

Expand All @@ -41,6 +41,16 @@
class ButlerQuantumContext:
"""A Butler-like class specialized for a single quantum.
Parameters
----------
butler : `lsst.daf.butler.LimitedButler`
Butler object from/to which datasets will be get/put.
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by a
single execution of this node in the pipeline graph.
Notes
-----
A ButlerQuantumContext class wraps a standard butler interface and
specializes it to the context of a given quantum. What this means
in practice is that the only gets and puts that this class allows
Expand All @@ -50,28 +60,9 @@ class ButlerQuantumContext:
what was actually get and put. This is in contrast to what the
preflight expects to be get and put by looking at the graph before
execution.
Do not use constructor directly, instead use `from_full` or `from_limited`
factory methods.
Notes
-----
`ButlerQuantumContext` instances are backed by either
`lsst.daf.butler.Butler` or `lsst.daf.butler.LimitedButler`. When a
limited butler is used then quantum has to contain dataset references
that are completely resolved (usually when graph is constructed by
GraphBuilder).
When instances are backed by full butler, the quantum graph does not have
to resolve output or intermediate references, but input references of each
quantum have to be resolved before they can be used by this class. When
executing such graphs, intermediate references used as input to some
Quantum are resolved by ``lsst.ctrl.mpexec.SingleQuantumExecutor``. If
output references of a quanta are resolved, they will be unresolved when
full butler is used.
"""

def __init__(self, *, limited: LimitedButler, quantum: Quantum, butler: Butler | None = None):
def __init__(self, butler: LimitedButler, quantum: Quantum):
self.quantum = quantum
self.allInputs = set()
self.allOutputs = set()
Expand All @@ -81,49 +72,7 @@ def __init__(self, *, limited: LimitedButler, quantum: Quantum, butler: Butler |
for refs in quantum.outputs.values():
for ref in refs:
self.allOutputs.add((ref.datasetType, ref.dataId))
self.__full_butler = butler
self.__butler = limited

@classmethod
def from_full(cls, butler: Butler, quantum: Quantum) -> ButlerQuantumContext:
"""Make ButlerQuantumContext backed by `lsst.daf.butler.Butler`.
Parameters
----------
butler : `lsst.daf.butler.Butler`
Butler object from/to which datasets will be get/put.
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by
a single execution of this node in the pipeline graph. All input
dataset references must be resolved in this Quantum. Output
references can be resolved, but they will be unresolved.
Returns
-------
butlerQC : `ButlerQuantumContext`
Instance of butler wrapper.
"""
return ButlerQuantumContext(limited=butler, butler=butler, quantum=quantum)

@classmethod
def from_limited(cls, butler: LimitedButler, quantum: Quantum) -> ButlerQuantumContext:
"""Make ButlerQuantumContext backed by `lsst.daf.butler.LimitedButler`.
Parameters
----------
butler : `lsst.daf.butler.LimitedButler`
Butler object from/to which datasets will be get/put.
quantum : `lsst.daf.butler.core.Quantum`
Quantum object that describes the datasets which will be get/put by
a single execution of this node in the pipeline graph. Both input
and output dataset references must be resolved in this Quantum.
Returns
-------
butlerQC : `ButlerQuantumContext`
Instance of butler wrapper.
"""
return ButlerQuantumContext(limited=butler, quantum=quantum)
self.__butler = butler

def _get(self, ref: Optional[Union[DeferredDatasetRef, DatasetRef]]) -> Any:
# Butler methods below will check for unresolved DatasetRefs and
Expand All @@ -140,10 +89,7 @@ def _get(self, ref: Optional[Union[DeferredDatasetRef, DatasetRef]]) -> Any:
def _put(self, value: Any, ref: DatasetRef) -> None:
"""Store data in butler"""
self._checkMembership(ref, self.allOutputs)
if self.__full_butler is not None:
self.__full_butler.put(value, ref)
else:
self.__butler.put(value, ref)
self.__butler.put(value, ref)

def get(
self,
Expand Down
13 changes: 5 additions & 8 deletions python/lsst/pipe/base/executionButlerBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,9 @@ def _accumulate(
if not isinstance(refs, list):
refs = [refs]
for ref in refs:
if ref.id is not None:
# We could check existence of individual components,
# but it should be less work to check their parent.
if ref.isComponent():
ref = ref.makeCompositeRef()
check_refs.add(ref)
if ref.isComponent():
ref = ref.makeCompositeRef()
check_refs.add(ref)
exist_map = butler.datastore.knows_these(check_refs)
existing_ids = set(ref.id for ref, exists in exist_map.items() if exists)
del exist_map
Expand All @@ -189,7 +186,7 @@ def _accumulate(
for ref in refs:
# Component dataset ID is the same as its parent ID, so
# checking component in existing_ids works OK.
if ref.id is not None and ref.id in existing_ids:
if ref.id in existing_ids:
# If this is a component we want the composite to be
# exported.
if ref.isComponent():
Expand Down Expand Up @@ -351,7 +348,7 @@ def _import(
# because execution butler is assumed to be able to see all the file
# locations that the main datastore can see. "split" supports some
# absolute URIs in the datastore.
newButler.import_(filename=yamlBuffer, format="yaml", reuseIds=True, transfer="split")
newButler.import_(filename=yamlBuffer, format="yaml", transfer="split")

# If there is modifier callable, run it to make necessary updates
# to the new butler.
Expand Down
3 changes: 1 addition & 2 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def makeQuantum(self, datastore_records: Optional[Mapping[str, DatastoreRecordDa
quantum_records = {}
input_refs = list(itertools.chain.from_iterable(helper.inputs.values()))
input_refs += list(initInputs.values())
input_ids = set(ref.id for ref in input_refs if ref.id is not None)
input_ids = set(ref.id for ref in input_refs)
for datastore_name, records in datastore_records.items():
matching_records = records.subset(input_ids)
if matching_records is not None:
Expand Down Expand Up @@ -621,7 +621,6 @@ def resolveRef(self, dataset_type: DatasetType, data_id: DataCoordinate) -> Data
if key not in self.resolved:
raise ValueError(f"Composite dataset is missing from cache: {parent_type} {data_id}")
parent_ref = self.resolved[key]
assert parent_ref.id is not None and parent_ref.run is not None, "parent ref must be resolved"
return DatasetRef(dataset_type, data_id, id=parent_ref.id, run=parent_ref.run, conform=False)

key = dataset_type, data_id
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/script/transfer_from_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def transfer_from_graph(
# Make QBB, its config is the same as output Butler.
qbb = QuantumBackedButler.from_predicted(
config=dest,
predicted_inputs=[ref.getCheckedId() for ref in output_refs],
predicted_inputs=[ref.id for ref in output_refs],
predicted_outputs=[],
dimensions=qgraph.universe,
datastore_records={},
Expand Down
39 changes: 1 addition & 38 deletions python/lsst/pipe/base/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,42 +279,6 @@ def _refFromConnection(
raise ValueError(f"Dataset type ({connection.name}) and ID {dataId.byName()} not compatible.") from e


def _resolveTestQuantumInputs(butler: Butler, quantum: Quantum) -> None:
"""Look up all input datasets a test quantum in the `Registry` to resolve
all `DatasetRef` objects (i.e. ensure they have not-`None` ``id`` and
``run`` attributes).
Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.
butler : `~lsst.daf.butler.Butler`
Data butler.
"""
# TODO (DM-26819): This function is a direct copy of
# `lsst.ctrl.mpexec.SingleQuantumExecutor.updateQuantumInputs`, but the
# `runTestQuantum` function that calls it is essentially duplicating logic
# in that class as well (albeit not verbatim). We should probably move
# `SingleQuantumExecutor` to ``pipe_base`` and see if it is directly usable
# in test code instead of having these classes at all.
for refsForDatasetType in quantum.inputs.values():
newRefsForDatasetType = []
for ref in refsForDatasetType:
if ref.id is None:
resolvedRef = butler.registry.findDataset(
ref.datasetType, ref.dataId, collections=butler.collections
)
if resolvedRef is None:
raise ValueError(
f"Cannot find {ref.datasetType.name} with id {ref.dataId} "
f"in collections {butler.collections}."
)
newRefsForDatasetType.append(resolvedRef)
else:
newRefsForDatasetType.append(ref)
refsForDatasetType[:] = newRefsForDatasetType


def runTestQuantum(
task: PipelineTask, butler: Butler, quantum: Quantum, mockRun: bool = True
) -> Optional[unittest.mock.Mock]:
Expand All @@ -339,8 +303,7 @@ def runTestQuantum(
If ``mockRun`` is set, the mock that replaced ``run``. This object can
be queried for the arguments ``runQuantum`` passed to ``run``.
"""
_resolveTestQuantumInputs(butler, quantum)
butlerQc = ButlerQuantumContext.from_full(butler, quantum)
butlerQc = ButlerQuantumContext(butler, quantum)
# This is a type ignore, because `connections` is a dynamic class, but
# it for sure will have this property
connections = task.config.ConnectionsClass(config=task.config) # type: ignore
Expand Down
28 changes: 0 additions & 28 deletions tests/test_graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,34 +134,6 @@ def test_datastore_records(self):
else:
self.assertEqual(quantum.datastore_records, {})

def testResolveRefs(self):
"""Test for GraphBuilder returning graph with all resolved refs."""

def _assert_resolved(refs):
self.assertTrue(all(ref.id is not None for ref in refs))

with temporaryDirectory() as root:
_, qgraph = simpleQGraph.makeSimpleQGraph(root=root)
self.assertEqual(len(qgraph), 5)

# check per-quantum inputs/outputs
for node in qgraph:
quantum = node.quantum
for datasetType, refs in quantum.inputs.items():
_assert_resolved(refs)
for datasetType, refs in quantum.outputs.items():
_assert_resolved(refs)

# check per-task init-inputs/init-outputs
for taskDef in qgraph.iterTaskGraph():
if (refs := qgraph.initInputRefs(taskDef)) is not None:
_assert_resolved(refs)
if (refs := qgraph.initOutputRefs(taskDef)) is not None:
_assert_resolved(refs)

# check global init-outputs
_assert_resolved(qgraph.globalInitOutputRefs())


if __name__ == "__main__":
lsst.utils.tests.init()
Expand Down
19 changes: 4 additions & 15 deletions tests/test_pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def __init__(self) -> None:
self.registry = SimpleNamespace(dimensions=DimensionUniverse())

def get(self, ref: DatasetRef) -> Any:
# Requires resolved ref.
assert ref.id is not None
dsdata = self.datasets.get(ref.datasetType.name)
if dsdata:
return dsdata.get(ref.dataId)
Expand Down Expand Up @@ -161,10 +159,7 @@ def _testRunQuantum(self, full_butler: bool) -> None:
# run task on each quanta
checked_get = False
for quantum in quanta:
if full_butler:
butlerQC = pipeBase.ButlerQuantumContext.from_full(butler, quantum)
else:
butlerQC = pipeBase.ButlerQuantumContext.from_limited(butler, quantum)
butlerQC = pipeBase.ButlerQuantumContext(butler, quantum)
inputRefs, outputRefs = connections.buildDatasetRefs(quantum)
task.runQuantum(butlerQC, inputRefs, outputRefs)

Expand Down Expand Up @@ -241,19 +236,13 @@ def _testChain2(self, full_butler: bool) -> None:
ref = quantum.inputs[dstype0.name][0]
butler.put(100 + i, ref)

butler_qc_factory = (
pipeBase.ButlerQuantumContext.from_full
if full_butler
else pipeBase.ButlerQuantumContext.from_limited
)

# run task on each quanta
for quantum in quanta1:
butlerQC = butler_qc_factory(butler, quantum)
butlerQC = pipeBase.ButlerQuantumContext(butler, quantum)
inputRefs, outputRefs = connections1.buildDatasetRefs(quantum)
task1.runQuantum(butlerQC, inputRefs, outputRefs)
for quantum in quanta2:
butlerQC = butler_qc_factory(butler, quantum)
butlerQC = pipeBase.ButlerQuantumContext(butler, quantum)
inputRefs, outputRefs = connections2.buildDatasetRefs(quantum)
task2.runQuantum(butlerQC, inputRefs, outputRefs)

Expand Down Expand Up @@ -289,7 +278,7 @@ def testButlerQC(self):
ref = quantum.inputs[dstype0.name][0]
butler.put(100, ref)

butlerQC = pipeBase.ButlerQuantumContext.from_full(butler, quantum)
butlerQC = pipeBase.ButlerQuantumContext(butler, quantum)

# Pass ref as single argument or a list.
obj = butlerQC.get(ref)
Expand Down

0 comments on commit ccb5b7f

Please sign in to comment.