Skip to content

Commit

Permalink
Add support for --clobber-output option
Browse files Browse the repository at this point in the history
  • Loading branch information
andy-slac committed Sep 17, 2019
1 parent e4f117b commit 02eafa4
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 41 deletions.
9 changes: 6 additions & 3 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ def makeGraph(self, pipeline, taskFactory, args):
outputOverrides=outputs)

# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(taskFactory, butler.registry, args.skip_existing)
graphBuilder = GraphBuilder(taskFactory, butler.registry,
skipExisting=args.skip_existing,
clobberExisting=args.clobber_output)
qgraph = graphBuilder.makeGraph(pipeline, coll, args.data_query)

# count quanta in graph and give a warning if it's empty and return None
Expand Down Expand Up @@ -415,14 +417,15 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
if not butler.run:
raise ValueError("no output collection defined in data butler")

preExecInit = PreExecInit(butler, taskFactory, args.skip_existing)
preExecInit = PreExecInit(butler, taskFactory, args.skip_existing, args.clobber_output)
preExecInit.initialize(graph,
saveInitOutputs=not args.skip_init_writes,
registerDatasetTypes=args.register_dataset_types)

if not args.init_only:
executor = MPGraphExecutor(numProc=args.processes, timeout=self.MP_TIMEOUT,
skipExisting=args.skip_existing)
skipExisting=args.skip_existing,
clobberOutput=args.clobber_output)
with util.profile(args.profile, _LOG):
executor.execute(graph, butler, taskFactory)

Expand Down
16 changes: 12 additions & 4 deletions python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,18 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)
"ordering is performed as last step before saving or executing "
"pipeline.")
if subcommand in ("qgraph", "run"):
subparser.add_argument("--skip-existing", dest="skip_existing",
default=False, action="store_true",
help="If all Quantum outputs already exist in output collection "
"then Quantum will be excluded from QuantumGraph.")
group = subparser.add_mutually_exclusive_group()
group.add_argument("--skip-existing", dest="skip_existing",
default=False, action="store_true",
help="If all Quantum outputs already exist in output collection "
"then Quantum will be excluded from QuantumGraph.")
group.add_argument("--clobber-output", dest="clobber_output",
default=False, action="store_true",
help="Ignore or replace existing output datasets in output collecton. "
"With this option existing output datasets are ignored when generating "
"QuantumGraph, and they are removed from a collection prior to "
"executing individual Quanta. This option is exclusive with "
"--skip-existing option.")
subparser.add_argument("-s", "--save-pipeline", dest="save_pipeline",
help="Location for storing a serialized pipeline definition (pickle file).",
metavar="PATH")
Expand Down
18 changes: 13 additions & 5 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ class MPGraphExecutor(QuantumGraphExecutor):
Time in seconds to wait for tasks to finish.
skipExisting : `bool`, optional
If True then quanta with all existing outputs are not executed.
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
"""
def __init__(self, numProc, timeout, skipExisting=False):
def __init__(self, numProc, timeout, skipExisting=False, clobberOutput=False):
self.numProc = numProc
self.timeout = timeout
self.skipExisting = skipExisting
self.clobberOutput = clobberOutput

def execute(self, graph, butler, taskFactory):
# Docstring inherited from QuantumGraphExecutor.execute
Expand All @@ -85,7 +89,7 @@ def _executeQuantaInProcess(self, iterable, butler, taskFactory):
_LOG.debug("Executing %s", qdata)
taskDef = qdata.taskDef
self._executePipelineTask(taskDef.taskClass, taskDef.config, qdata.quantum,
butler, taskFactory, self.skipExisting)
butler, taskFactory, self.skipExisting, self.clobberOutput)

def _executeQuantaMP(self, iterable, butler, taskFactory):
"""Execute all Quanta in separate process pool.
Expand Down Expand Up @@ -129,7 +133,8 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):

# Add it to the pool and remember its result
_LOG.debug("Sumbitting %s", qdata)
args = (taskDef.taskClass, taskDef.config, qdata.quantum, butler, taskFactory, self.skipExisting)
args = (taskDef.taskClass, taskDef.config, qdata.quantum, butler, taskFactory,
self.skipExisting, self.clobberOutput)
results[qdata.index] = pool.apply_async(self._executePipelineTask, args)

# Everything is submitted, wait until it's complete
Expand All @@ -142,7 +147,7 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):
res.get(self.timeout)

@staticmethod
def _executePipelineTask(taskClass, config, quantum, butler, taskFactory, skipExisting):
def _executePipelineTask(taskClass, config, quantum, butler, taskFactory, skipExisting, clobberOutput):
"""Execute PipelineTask on a single data item.
Parameters
Expand All @@ -159,6 +164,9 @@ def _executePipelineTask(taskClass, config, quantum, butler, taskFactory, skipEx
Task factory.
skipExisting : `bool`
If True then quanta with all existing outputs are not executed.
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
"""
executor = SingleQuantumExecutor(butler, taskFactory, skipExisting)
executor = SingleQuantumExecutor(butler, taskFactory, skipExisting, clobberOutput)
return executor.execute(taskClass, config, quantum)
41 changes: 31 additions & 10 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ class PreExecInit:
If `True` then do not try to overwrite any datasets that might exist
in the butler. If `False` then any existing conflicting dataset will
cause butler exception.
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
"""
def __init__(self, butler, taskFactory, skipExisting=False):
def __init__(self, butler, taskFactory, skipExisting=False, clobberOutput=False):
self.butler = butler
self.taskFactory = taskFactory
self.skipExisting = skipExisting
self.clobberOutput = clobberOutput

def initialize(self, graph, saveInitOutputs=True, registerDatasetTypes=False):
"""Perform all initialization steps.
Expand Down Expand Up @@ -164,18 +168,25 @@ def _refComponents(refs):

_LOG.debug("Associating %d datasets with output collection %s", len(id2ref), collection)

refs = []
if not self.skipExisting:
refsToAdd = []
refsToRemove = []
if not self.skipExisting and not self.clobberOutput:
# optimization - save all at once, butler will raise an exception
# if any dataset is already there
refs = list(id2ref.values())
refsToAdd = list(id2ref.values())
else:
# skip existing ones
# skip or override existing ones
for ref in id2ref.values():
if registry.find(collection, ref.datasetType, ref.dataId) is None:
refs.append(ref)
if refs:
registry.associate(collection, refs)
refsToAdd.append(ref)
elif self.clobberOutput:
# replace this dataset
refsToRemove.append(ref)
refsToAdd.append(ref)
if refsToRemove:
registry.disassociate(collection, refsToRemove)
if refsToAdd:
registry.associate(collection, refsToAdd)

def saveInitOutputs(self, graph):
"""Write any datasets produced by initializing tasks in a graph.
Expand Down Expand Up @@ -212,13 +223,23 @@ def saveInitOutputs(self, graph):
attribute = getattr(taskDef.connections, name)
initOutputVar = getattr(task, name)
objFromStore = None
if self.skipExisting:
if self.clobberOutput:
# Remove if it already exists.
collection = self.butler.run.collection
registry = self.butler.registry
ref = registry.find(collection, attribute.name, {})
if ref is not None:
# It is not enough to remove dataset from collection,
# it has to be removed from butler too.
self.butler.remove(ref)
elif self.skipExisting:
# check if it is there already
_LOG.debug("Retrieving InitOutputs for task=%s key=%s dsTypeName=%s",
task, name, attribute.name)
objFromStore = self.butler.get(attribute.name, {})
if objFromStore is not None:
# types are supposed to be identical
# Types are supposed to be identical.
# TODO: Check that object contets is identical too.
if type(objFromStore) is not type(initOutputVar):
raise TypeError(f"Stored initOutput object type {type(objFromStore)} "
f"is different from task-generated type "
Expand Down
29 changes: 28 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ class SingleQuantumExecutor:
Instance of a task factory.
skipExisting : `bool`, optional
If True then quanta with all existing outputs are not executed.
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
"""
def __init__(self, butler, taskFactory, skipExisting=False):
def __init__(self, butler, taskFactory, skipExisting=False, clobberOutput=False):
self.butler = butler
self.taskFactory = taskFactory
self.skipExisting = skipExisting
self.clobberOutput = clobberOutput

def execute(self, taskClass, config, quantum):
"""Execute PipelineTask on a single Quantum.
Expand All @@ -70,6 +74,8 @@ def execute(self, taskClass, config, quantum):
Single Quantum instance.
"""
self.setupLogging(taskClass, config, quantum)
if self.clobberOutput:
self.doClobberOutputs(quantum)
if self.skipExisting and self.quantumOutputsExist(quantum):
_LOG.info("Quantum execution skipped due to existing outputs.")
return
Expand Down Expand Up @@ -101,6 +107,27 @@ def setupLogging(self, taskClass, config, quantum):
else:
Log.MDC("LABEL", '[' + ', '.join([str(dataId) for dataId in dataIds]) + ']')

def doClobberOutputs(self, quantum):
"""DElete any outputs that already exist for a Quantum.
Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Quantum to check for existing outputs.
"""
collection = self.butler.run.collection
registry = self.butler.registry

existingRefs = []
for datasetRefs in quantum.outputs.values():
for datasetRef in datasetRefs:
ref = registry.find(collection, datasetRef.datasetType, datasetRef.dataId)
if ref is not None:
existingRefs.append(ref)
for ref in existingRefs:
_LOG.debug("Removing existing dataset: %s", ref)
self.butler.remove(ref)

def quantumOutputsExist(self, quantum):
"""Decide whether this quantum needs to be executed.
Expand Down
56 changes: 40 additions & 16 deletions tests/testUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ def put(self, obj, datasetRefOrType, dataId=None, producer=None, **kwds):
ref = DatasetRef(datasetType, dataId, id=refId)
return ref

def remove(self, datasetRefOrType, dataId=None, *, delete=True, remember=True, **kwds):
datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwds)
_LOG.info("butler.remove: datasetType=%s dataId=%s", datasetType.name, dataId)
dsTypeName = datasetType.name
key = self.key(dataId)
dsdata = self.datasets.get(dsTypeName)
del dsdata[key]
ref = self.registry.find(self.run.collection, datasetType, dataId, **kwds)
if remember:
self.registry.disassociate(self.run.collection, [ref])
else:
self.registry.removeDataset(ref)


def registerDatasetTypes(registry, pipeline):
"""Register all dataset types used by tasks in a registry.
Expand All @@ -193,7 +206,7 @@ def registerDatasetTypes(registry, pipeline):
registry.registerDatasetType(datasetType)


def makeSimpleQGraph(nQuanta=5, pipeline=None):
def makeSimpleQGraph(nQuanta=5, pipeline=None, butler=None, skipExisting=False, clobberExisting=False):
"""Make simple QuantumGraph for tests.
Makes simple one-task pipeline with AddTask, sets up in-memory
Expand All @@ -207,6 +220,15 @@ def makeSimpleQGraph(nQuanta=5, pipeline=None):
pipeline : `~lsst.pipe.base.Pipeline`
If `None` then one-task pipeline is made with `AddTask` and
default `AddTaskConfig`.
butler : `~lsst.daf.butler.Butler`, optional
Data butler instance, this should be an instance retruned from a
previous call to this method.
skipExisting : `bool`, optional
If `True` (default), a Quantum is not created if all its outputs
already exist.
clobberExisting : `bool`, optional
If `True`, overwrite any outputs that already exist. Cannot be
`True` if ``skipExisting`` is.
Returns
-------
Expand All @@ -216,30 +238,32 @@ def makeSimpleQGraph(nQuanta=5, pipeline=None):
Quantum graph instance
"""

butler = ButlerMock(fullRegistry=True)

if pipeline is None:
taskDef = pipeBase.TaskDef("AddTask", AddTaskConfig(), taskClass=AddTask, label="task1")
pipeline = pipeBase.Pipeline([taskDef])

# Add dataset types to registry
registerDatasetTypes(butler.registry, pipeline)
if butler is None:

butler = ButlerMock(fullRegistry=True)

# Add dataset types to registry
registerDatasetTypes(butler.registry, pipeline)

# Small set of DataIds included in QGraph
dataIds = [dict(instrument="INSTR", detector=detector) for detector in range(nQuanta)]
# Small set of DataIds included in QGraph
dataIds = [dict(instrument="INSTR", detector=detector) for detector in range(nQuanta)]

# Add all needed dimensions to registry
butler.registry.addDimensionEntry("instrument", dict(instrument="INSTR"))
butler.registry.addDimensionEntryList("detector", dataIds)
# Add all needed dimensions to registry
butler.registry.addDimensionEntry("instrument", dict(instrument="INSTR"))
butler.registry.addDimensionEntryList("detector", dataIds)

# Add inputs to butler, inputs a simply integers, remeber their refs
inputRefs = []
for i, dataId in enumerate(dataIds):
data = numpy.array([i, 10*i])
inputRefs.append(butler.put(data, "add_input", dataId))
# Add inputs to butler
for i, dataId in enumerate(dataIds):
data = numpy.array([i, 10*i])
butler.put(data, "add_input", dataId)

# Make the graph, task factory is not needed here
builder = pipeBase.GraphBuilder(taskFactory=None, registry=butler.registry)
builder = pipeBase.GraphBuilder(taskFactory=None, registry=butler.registry,
skipExisting=skipExisting, clobberExisting=clobberExisting)
originInfo = DatasetOriginInfoDef([butler.run.collection], butler.run.collection)
qgraph = builder.makeGraph(pipeline, originInfo, "")

Expand Down
32 changes: 32 additions & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
args.register_dataset_types = False
args.skip_init_writes = False
args.skip_existing = False
args.clobber_output = False
args.init_only = False
args.processes = 1
args.profile = None
Expand Down Expand Up @@ -294,6 +295,37 @@ def testSimpleQGraphSkipExisting(self):
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(AddTask.countExec, nQuanta)

def testSimpleQGraphClobberOutput(self):
"""Test re-execution of trivial quantum graph with --clobber-output.
"""

nQuanta = 5
butler, qgraph = makeSimpleQGraph(nQuanta)

# should have one task and number of quanta
self.assertEqual(len(qgraph), 1)
self.assertEqual(qgraph.countQuanta(), nQuanta)

args = _makeArgs()
fwk = CmdLineFwk()
taskFactory = AddTaskFactoryMock()

# run whole thing
AddTask.stopAt = -1
AddTask.countExec = 0
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(AddTask.countExec, nQuanta)

# and repeat
args.clobber_output = True
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(AddTask.countExec, 2*nQuanta)

# rebuild graph with clobber option, should make same graph
butler, qgraph = makeSimpleQGraph(nQuanta, butler=butler, clobberExisting=True)
self.assertEqual(len(qgraph), 1)
self.assertEqual(qgraph.countQuanta(), nQuanta)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
pass
Expand Down

0 comments on commit 02eafa4

Please sign in to comment.