Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-38601: Fix checkExistingOutputs method to clobber complete outputs #229

Merged
merged 2 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-38601.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed `SingleQuantumExecutor` class to correctly handle the case with `clobberOutputs=True` and `skipExistingIn=None`.
Documentation says that complete quantum outputs should be removed in this case, but they were not removed.
10 changes: 9 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,19 @@ def findOutputs(
missingRefs.append(registryRefToQuantumRef[ref])
return existingRefs, missingRefs

# If skipExistingIn is None this will search in butler.run.
existingRefs, missingRefs = findOutputs(self.skipExistingIn)
if self.skipExistingIn:
if existingRefs and not missingRefs:
# everything is already there
# Everything is already there, and we do not clobber complete
# outputs if skipExistingIn is specified.
return True

# If we are to re-run quantum then prune datasets that exists in
# output run collection, only if `self.clobberOutputs` is set,
# that only works when we have full butler.
if existingRefs and self.butler is not None:
# Look at butler run instead of skipExistingIn collections.
existingRefs, missingRefs = findOutputs(self.butler.run)
if existingRefs and missingRefs:
_LOG.debug(
Expand All @@ -403,6 +406,11 @@ def findOutputs(
f" collection={self.butler.run} existingRefs={existingRefs}"
f" missingRefs={missingRefs}"
)
elif existingRefs and self.clobberOutputs and not self.skipExistingIn:
# Clobber complete outputs if skipExistingIn is not specified.
_LOG.info("Removing complete outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False

# need to re-run
return False
Expand Down
112 changes: 112 additions & 0 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import faulthandler
import logging
import os
import signal
import sys
import time
Expand All @@ -40,14 +41,19 @@
MPTimeoutError,
QuantumExecutor,
QuantumReport,
SingleQuantumExecutor,
)
from lsst.ctrl.mpexec.execFixupDataId import ExecFixupDataId
from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
from lsst.pipe.base import NodeId
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph

logging.basicConfig(level=logging.DEBUG)

_LOG = logging.getLogger(__name__)

TESTDIR = os.path.abspath(os.path.dirname(__file__))


class QuantumExecutorMock(QuantumExecutor):
"""Mock class for QuantumExecutor"""
Expand Down Expand Up @@ -584,5 +590,111 @@ def test_mpexec_num_fd(self):
self.assertLess(num_fds_1 - num_fds_0, 5)


class SingleQuantumExecutorTestCase(unittest.TestCase):
"""Tests for SingleQuantumExecutor implementation."""

instrument = "lsst.pipe.base.tests.simpleQGraph.SimpleInstrument"

def setUp(self):
self.root = makeTestTempDir(TESTDIR)

def tearDown(self):
removeTestTempDir(self.root)

def test_simple_execute(self) -> None:
"""Run execute() method in simplest setup."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

# There must be one dataset of task's output connection
refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)

def test_skip_existing_execute(self) -> None:
"""Run execute() method twice, with skip_existing_in."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_1 = refs[0].id

# Re-run it with skipExistingIn, it should not run.
assert butler.run is not None
executor = SingleQuantumExecutor(butler, taskFactory, skipExistingIn=[butler.run])
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_2 = refs[0].id
self.assertEqual(dataset_id_1, dataset_id_2)

def test_clobber_outputs_execute(self) -> None:
"""Run execute() method twice, with clobber_outputs."""

nQuanta = 1
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, instrument=self.instrument)

nodes = list(qgraph)
self.assertEqual(len(nodes), nQuanta)
node = nodes[0]

taskFactory = AddTaskFactoryMock()
executor = SingleQuantumExecutor(butler, taskFactory)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_1 = refs[0].id

# Re-run it with clobberOutputs and skipExistingIn, it should not
# clobber but should skip instead.
assert butler.run is not None
executor = SingleQuantumExecutor(
butler, taskFactory, skipExistingIn=[butler.run], clobberOutputs=True
)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 1)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_2 = refs[0].id
self.assertEqual(dataset_id_1, dataset_id_2)

# Re-run it with clobberOutputs but without skipExistingIn, it should
# clobber.
assert butler.run is not None
executor = SingleQuantumExecutor(butler, taskFactory, clobberOutputs=True)
executor.execute(node.taskDef, node.quantum)
self.assertEqual(taskFactory.countExec, 2)

refs = list(butler.registry.queryDatasets("add_dataset1", collections=butler.run))
self.assertEqual(len(refs), 1)
dataset_id_3 = refs[0].id
self.assertNotEqual(dataset_id_1, dataset_id_3)


if __name__ == "__main__":
unittest.main()