Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38210: Use butler.get() rather than deprecated getDirect() #227

Merged
merged 4 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task
# collection from its chain collection first.
with butler.transaction():
butler.registry.setCollectionChain(self.output.name, chainDefinition, flatten=True)
butler.pruneCollection(replaced, purge=True, unstore=True)
butler.removeRuns([replaced], unstore=True)
elif args.prune_replaced is not None:
raise NotImplementedError(f"Unsupported --prune-replaced option '{args.prune_replaced}'.")
if not self.output.exists:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _store_log_records(
raise InvalidQuantumError(
f"Quantum contains unresolved reference for task log output dataset type {dataset_type}."
)
self.butler.putDirect(log_handler.records, ref)
self.butler.put(log_handler.records, ref)
else:
self.full_butler.put(log_handler.records, ref)

Expand Down
14 changes: 7 additions & 7 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
_LOG.debug(
"Retrieving InitOutputs for task=%s key=%s dsTypeName=%s", task, name, attribute.name
)
obj_from_store = self.butler.getDirect(init_output_ref)
obj_from_store = self.butler.get(init_output_ref)
# Types are supposed to be identical.
# TODO: Check that object contents is identical too.
if type(obj_from_store) is not type(init_output_var):
Expand All @@ -192,7 +192,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
else:
_LOG.debug("Saving InitOutputs for task=%s key=%s", taskDef.label, name)
# This can still raise if there is a concurrent write.
self.butler.putDirect(init_output_var, init_output_ref)
self.butler.put(init_output_var, init_output_ref)

def saveConfigs(self, graph: QuantumGraph) -> None:
"""Write configurations for pipeline tasks to butler or check that
Expand Down Expand Up @@ -234,7 +234,7 @@ def logConfigMismatch(msg: str) -> None:
else:
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, config_name)
self.butler.putDirect(taskDef.config, dataset_ref)
self.butler.put(taskDef.config, dataset_ref)

def savePackageVersions(self, graph: QuantumGraph) -> None:
"""Write versions of software packages to butler.
Expand Down Expand Up @@ -271,9 +271,9 @@ def savePackageVersions(self, graph: QuantumGraph) -> None:
# have to remove existing dataset first, butler has no
# replace option.
self.butler.pruneDatasets([dataset_ref], unstore=True, purge=True)
self.butler.putDirect(old_packages, dataset_ref)
self.butler.put(old_packages, dataset_ref)
else:
self.butler.putDirect(packages, dataset_ref)
self.butler.put(packages, dataset_ref)

@abc.abstractmethod
def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
Expand Down Expand Up @@ -526,7 +526,7 @@ def _find_existing(
ref = self.full_butler.registry.findDataset(dataset_type, dataId, collections=[run])
if self.extendRun and ref is not None:
try:
config = self.butler.getDirect(ref)
config = self.butler.get(ref)
return config, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down Expand Up @@ -580,7 +580,7 @@ def _find_existing(self, refs: Iterable[DatasetRef], dataset_type: str) -> tuple
for ref in refs:
if ref.datasetType.name == dataset_type:
try:
data = self.butler.getDirect(ref)
data = self.butler.get(ref)
return data, ref
except (LookupError, FileNotFoundError):
return None, ref
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def writeMetadata(
ref = ref.unresolved()
self.butler.put(metadata, ref)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TallJimbo this fails if we don't unresolve the ref because of this test code for replace run in test_cmdLineFwk:

        # re-run with --replace-run (--inputs is ignored, as long as it hasn't
        # changed)
        args.replace_run = True
        args.output_run = "output/run2"
        fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)

and you get a resolved ref for run output/run1 being put into output/run2 and it failing because it wants to put it into output/run1.

@andy-slac Interestingly we also get another failure in the simplest possible test:

        qgraph = fwk.makeGraph(self.pipeline, args)
        # run whole thing
        fwk.runPipeline(qgraph, taskFactory, args)

where the problem is:

'output/20230330T204714Z' != 'output/20230330T204715Z'

with error: No collection with name 'output/20230330T204714Z' found. which makes me wonder how we are generating two different versions of that timestamped output run.

Copy link
Member Author

@timj timj Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course that second test failure with the timestamp is random because it depends on timing. I can also get testSimpleQGraphClobberOutputs to fail with the same timestamp mismatch as testSimpleQGraph. (it's the _metadata dataset each time)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't unresolve refs then in runPipeline we should probably take output run from QuantumGraph. But I'm not sure how this will interact with all other options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I say on Slack, recalculating the graph each time in the --replace-run test does work. This is going to have to be required unless --replace-run calls some kind of method on the graph to recalculate all the output dataset refs with a new run (and then you lose provenance in the graph that is on disk).

else:
limited_butler.putDirect(metadata, ref)
limited_butler.put(metadata, ref)

def initGlobals(self, quantum: Quantum) -> None:
"""Initialize global state needed for task execution.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/taskFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def makeTask(
dataset_type_name = attribute.name
for ref in initInputRefs:
if ref.datasetType.name == dataset_type_name:
init_inputs[name] = butler.getDirect(ref)
init_inputs[name] = butler.get(ref)
break

# make task instance
Expand Down
18 changes: 11 additions & 7 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"""

import contextlib
import copy
import logging
import os
import pickle
Expand Down Expand Up @@ -739,7 +738,7 @@ def testSimpleQGraphReplaceRun(self):
self.assertEqual(len(qgraph), self.nQuanta)

# deep copy is needed because quanta are updated in place
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
fwk.runPipeline(qgraph, taskFactory, args)
self.assertEqual(taskFactory.countExec, self.nQuanta)

# need to refresh collections explicitly (or make new butler/registry)
Expand All @@ -762,7 +761,8 @@ def testSimpleQGraphReplaceRun(self):
# changed)
args.replace_run = True
args.output_run = "output/run2"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change to demonstrate the fix but I wonder if we are punting to fixing this properly with unresolved ref removal that maybe I should not include this change here. @andy-slac would you rather this test failed once you remove unresolved refs or are you happy that this is the right fix for replace-run testing anyhow and so should keep this fix?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with this change. I guess once we have the ability to rewrite output run in a graph we will add a new unit test for that.

fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand All @@ -780,7 +780,8 @@ def testSimpleQGraphReplaceRun(self):
args.replace_run = True
args.prune_replaced = "unstore"
args.output_run = "output/run3"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand Down Expand Up @@ -810,7 +811,8 @@ def testSimpleQGraphReplaceRun(self):
args.replace_run = True
args.prune_replaced = "purge"
args.output_run = "output/run4"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)

butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
Expand All @@ -828,7 +830,8 @@ def testSimpleQGraphReplaceRun(self):
args.prune_replaced = None
args.replace_run = True
args.output_run = "output/run5"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})
Expand All @@ -837,7 +840,8 @@ def testSimpleQGraphReplaceRun(self):
args.prune_replaced = None
args.replace_run = True
args.output_run = "output/run6"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)
qgraph = fwk.makeGraph(self.pipeline, args)
fwk.runPipeline(qgraph, taskFactory, args)
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})
Expand Down