Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-20845: Support re-run of pipetask on the same output collection #25

Merged
merged 7 commits into from Sep 18, 2019

Conversation

andy-slac
Copy link
Collaborator

With --skip-existing option we now also support skipping Quanta at run time so that the same QGraph can be executed again after unfinished previous attempt and it will effectively restart execution from the point where it stopped previously. Just as before --skip-existing during QGraph generation means skipping Quanta whose outputs already exist.

New option --clobber-output can be used to override datasets that exist in output collection:

  • when generating QGraph all existing outputs are ignored (including regular outputs and initOutputs),
  • when executing QGraph the existing outputs are removed prior to Quantum execution.

Adds code to build simple quantum graph with a trivial task and mock
Butler, but real in-memory sqlite registry. New unit test to execute
that graph and check resulting task outputs.
@@ -125,7 +129,7 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):

# Add it to the pool and remember its result
_LOG.debug("Sumbitting %s", qdata)
args = (taskDef.taskClass, taskDef.config, qdata.quantum, butler, taskFactory)
args = (taskDef.taskClass, taskDef.config, qdata.quantum, butler, taskFactory, self.skipExisting)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting to be a lot of positional arguments. Maybe pass as kwargs instead (and maybe even require some/all of them to be kwargs in the signature)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the method to accept kwargs only.

types against the types of objects produced by tasks. Ideally we
would like to check that object data is identical too but presently
there is no generic way to compare objects. In the future we can
potentially introduce some extensible mechanism for that.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now wondering if we should just explicitly avoid this whole problem with initOutputs and say that you have to use --skip-init-writes if you want to use --skip-existing when running (but maybe not when just building a graph).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially we can enable --skip-init-writes ourselves. The real question is do we want to verify that existing initOutputs in the butler are compatible with what pipeline expects. I think there is some usefulness in that, even if we cannot currently verify object contents for all types of objects. It may also be better to verify that initOutputs exist when we do --skip-init-writes to avoid crashes downstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that forcing the user to pass --skip-init-writes explicitly was a way of emphasizing to them that no checking would be done and hence they were responsible for guaranteeing consistency.

@@ -67,6 +70,9 @@ def execute(self, taskClass, config, quantum):
Single Quantum instance.
"""
self.setupLogging(taskClass, config, quantum)
if self.skipExisting and self.quantumOutputsExist(quantum):
_LOG.info("Quantum execution skipped due to existing outputs.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to include quantum.dataId and quantum.taskName in this log message.

if ref is not None:
# It is not enough to remove dataset from collection,
# it has to be removed from butler too.
self.butler.remove(ref)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually deleting the dataset is probably okay if it was originally added to only this collection. But it's actually a big problem if it's already part of some other collection, and it wouldn't even be necessary if it was originally part of some other collection. Of course, the usual case is the one you're guarding against, where not deleting it will definitely cause problems, so what you have here is probably the best thing to do right now. But it suggests to me that we really cannot let templates depend only on collection and not run, and when we fix that we should only disassociate (and maybe then "garbage collect" unassociated datasets) here.

# check if it is there already
_LOG.debug("Retrieving InitOutputs for task=%s key=%s dsTypeName=%s",
task, name, attribute.name)
objFromStore = self.butler.get(attribute.name, {})
if objFromStore is not None:
# types are supposed to be identical
# Types are supposed to be identical.
# TODO: Check that object contets is identical too.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: contents

@@ -207,6 +220,15 @@ def makeSimpleQGraph(nQuanta=5, pipeline=None):
pipeline : `~lsst.pipe.base.Pipeline`
If `None` then one-task pipeline is made with `AddTask` and
default `AddTaskConfig`.
butler : `~lsst.daf.butler.Butler`, optional
Data butler instance, this should be an instance retruned from a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: returned

This option allows one to restart execution after a failure in the
middle of graph execution (and after fixing an issue). It skips writing
initOutputs that already exist. It also skips execution of all quanta
that already have their output data in output collection.
@andy-slac andy-slac merged commit ae73b45 into master Sep 18, 2019
@timj timj deleted the tickets/DM-20845 branch April 23, 2020 17:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants