Skip to content

Commit

Permalink
Merge pull request #25 from lsst/tickets/DM-20845
Browse files Browse the repository at this point in the history
DM-20845: Support re-run of pipetask on the same output collection
  • Loading branch information
andy-slac committed Sep 18, 2019
2 parents fbcc076 + feda286 commit ae73b45
Show file tree
Hide file tree
Showing 16 changed files with 698 additions and 181 deletions.
26 changes: 17 additions & 9 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ def makeGraph(self, pipeline, taskFactory, args):
outputOverrides=outputs)

# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(taskFactory, butler.registry, args.skip_existing)
graphBuilder = GraphBuilder(taskFactory, butler.registry,
skipExisting=args.skip_existing,
clobberExisting=args.clobber_output)
qgraph = graphBuilder.makeGraph(pipeline, coll, args.data_query)

# count quanta in graph and give a warning if it's empty and return None
Expand All @@ -388,7 +390,7 @@ def makeGraph(self, pipeline, taskFactory, args):

return qgraph

def runPipeline(self, graph, taskFactory, args):
def runPipeline(self, graph, taskFactory, args, butler=None):
"""Execute complete QuantumGraph.
Parameters
Expand All @@ -399,27 +401,33 @@ def runPipeline(self, graph, taskFactory, args):
Task factory.
args : `argparse.Namespace`
Parsed command line
butler : `~lsst.daf.butler.Butler`, optional
Data Butler instance, if not defined then new instance is made
using command line options.
"""
# If default output collection is given then use it to override
# butler-configured one.
run = args.output.get("", None)

# make butler instance
butler = Butler(config=args.butler_config, run=run)
if butler is None:
butler = Butler(config=args.butler_config, run=run)

# at this point we require that output collection was defined
if not butler.run:
raise ValueError("no output collection defined in data butler")

preExecInit = PreExecInit(butler)
preExecInit.initialize(graph, taskFactory,
registerDatasetTypes=args.register_dataset_types,
preExecInit = PreExecInit(butler, taskFactory, args.skip_existing, args.clobber_output)
preExecInit.initialize(graph,
saveInitOutputs=not args.skip_init_writes,
updateOutputCollection=True)
registerDatasetTypes=args.register_dataset_types)

if not args.init_only:
executor = MPGraphExecutor(numProc=args.processes, timeout=self.MP_TIMEOUT)
executor.execute(graph, butler, taskFactory)
executor = MPGraphExecutor(numProc=args.processes, timeout=self.MP_TIMEOUT,
skipExisting=args.skip_existing,
clobberOutput=args.clobber_output)
with util.profile(args.profile, _LOG):
executor.execute(graph, butler, taskFactory)

def showInfo(self, showOpts, pipeline, graph=None):
"""Display useful info about pipeline and environment.
Expand Down
16 changes: 12 additions & 4 deletions python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,18 @@ def makeParser(fromfile_prefix_chars='@', parser_class=ArgumentParser, **kwargs)
"ordering is performed as last step before saving or executing "
"pipeline.")
if subcommand in ("qgraph", "run"):
subparser.add_argument("--skip-existing", dest="skip_existing",
default=False, action="store_true",
help="If all Quantum outputs already exist in output collection "
"then Qauntum will be excluded from QuantumGraph.")
group = subparser.add_mutually_exclusive_group()
group.add_argument("--skip-existing", dest="skip_existing",
default=False, action="store_true",
help="If all Quantum outputs already exist in output collection "
"then Quantum will be excluded from QuantumGraph.")
group.add_argument("--clobber-output", dest="clobber_output",
default=False, action="store_true",
help="Ignore or replace existing output datasets in output collecton. "
"With this option existing output datasets are ignored when generating "
"QuantumGraph, and they are removed from a collection prior to "
"executing individual Quanta. This option is exclusive with "
"--skip-existing option.")
subparser.add_argument("-s", "--save-pipeline", dest="save_pipeline",
help="Location for storing a serialized pipeline definition (pickle file).",
metavar="PATH")
Expand Down
38 changes: 18 additions & 20 deletions python/lsst/ctrl/mpexec/examples/calexpToCoaddTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@

from lsst.afw.image import ExposureF
from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
InputDatasetField, OutputDatasetField)
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT

_LOG = logging.getLogger(__name__.partition(".")[2])


class CalexpToCoaddTaskConfig(PipelineTaskConfig):
calexp = InputDatasetField(name="calexp",
dimensions=["Instrument", "Visit", "Detector"],
storageClass="ExposureF",
doc="DatasetType for the input image")
coadd = OutputDatasetField(name="deepCoadd_calexp",
dimensions=["SkyMap", "Tract", "Patch", "AbstractFilter"],
storageClass="ExposureF",
scalar=True,
doc="DatasetType for the output image")
class CalexpToCoaddTaskConnections(PipelineTaskConnections,
dimensions=("skymap", "tract", "patch", "abstract_filter")):
calexp = cT.Input(name="calexp",
dimensions=["instrument", "visit", "detector"],
multiple=True,
storageClass="ExposureF",
doc="DatasetType for the input image")
coadd = cT.Output(name="deepCoadd_calexp",
dimensions=["skymap", "tract", "patch", "abstract_filter"],
storageClass="ExposureF",
doc="DatasetType for the output image")

def setDefaults(self):
# set dimensions of a quantum, this task uses per-tract-patch-filter quanta
self.quantum.dimensions = ["SkyMap", "Tract", "Patch", "AbstractFilter"]

class CalexpToCoaddTaskConfig(PipelineTaskConfig, pipelineConnections=CalexpToCoaddTaskConnections):
pass


class CalexpToCoaddTask(PipelineTask):
Expand All @@ -32,18 +34,14 @@ class CalexpToCoaddTask(PipelineTask):
ConfigClass = CalexpToCoaddTaskConfig
_DefaultName = 'calexpToCoaddTask'

def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
def run(self, calexp):
"""Operate on in-memory data.
Returns
-------
`Struct` instance with produced result.
"""
# calexps = inputData["calexp"]
calexpDataIds = inputDataIds["calexp"]
coaddDataIds = outputDataIds["coadd"]
_LOG.info("executing %s: calexp=%s coadd=%s",
self.getName(), calexpDataIds, coaddDataIds)
_LOG.info("executing %s: calexp=%s", self.getName(), calexp)

# output data, scalar in this case
data = ExposureF(100, 100)
Expand Down
22 changes: 10 additions & 12 deletions python/lsst/ctrl/mpexec/examples/make_example_qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
# Imports for other modules --
# -----------------------------
from lsst.daf.butler import DatasetRef, Quantum, Run
from lsst.pipe.base import DatasetTypeDescriptor
from lsst.pipe.base import Pipeline, QuantumGraph, QuantumGraphTaskNodes, TaskDef
from lsst.pipe.base import Pipeline, QuantumGraph, QuantumGraphTaskNodes, TaskDatasetTypes, TaskDef
from lsst.pipe.base.pipeTools import orderPipeline
from lsst.ctrl.mpexec.dotTools import graph2dot, pipeline2dot
from lsst.ctrl.mpexec.examples import test1task, test2task
Expand Down Expand Up @@ -96,26 +95,25 @@ def main():
step2 = _makeStep2TaskDef()
step3 = _makeStep3TaskDef()

dstype0 = DatasetTypeDescriptor.fromConfig(step1.config.input).datasetType
dstype1 = DatasetTypeDescriptor.fromConfig(step1.config.output).datasetType
dstype2 = DatasetTypeDescriptor.fromConfig(step2.config.output).datasetType
dstype3 = DatasetTypeDescriptor.fromConfig(step3.config.output).datasetType
dstypes1 = TaskDatasetTypes.fromConnections(step1.connections)
dstypes2 = TaskDatasetTypes.fromConnections(step2.connections)
dstypes3 = TaskDatasetTypes.fromConnections(step3.connections)

# quanta for first step which is 1-to-1 tasks
quanta = []
for visit in range(10):
quantum = Quantum(run=run, task=None)
quantum.addPredictedInput(_makeDSRefVisit(dstype0, visit))
quantum.addOutput(_makeDSRefVisit(dstype1, visit))
quantum.addPredictedInput(_makeDSRefVisit(dstypes1.inputs[0], visit))
quantum.addOutput(_makeDSRefVisit(dstypes1.outputs[0], visit))
quanta.append(quantum)
step1nodes = QuantumGraphTaskNodes(step1, quanta)

# quanta for second step which is 1-to-1 tasks
quanta = []
for visit in range(10):
quantum = Quantum(run=run, task=None)
quantum.addPredictedInput(_makeDSRefVisit(dstype1, visit))
quantum.addOutput(_makeDSRefVisit(dstype2, visit))
quantum.addPredictedInput(_makeDSRefVisit(dstypes2.inputs[0], visit))
quantum.addOutput(_makeDSRefVisit(dstypes2.outputs[0], visit))
quanta.append(quantum)
step2nodes = QuantumGraphTaskNodes(step2, quanta)

Expand All @@ -130,8 +128,8 @@ def main():
for tract, patch, visits in patch2visits:
quantum = Quantum(run=run, task=None)
for visit in visits:
quantum.addPredictedInput(_makeDSRefVisit(dstype2, visit))
quantum.addOutput(_makeDSRefPatch(dstype3, tract, patch))
quantum.addPredictedInput(_makeDSRefVisit(dstypes3.inputs[0], visit))
quantum.addOutput(_makeDSRefPatch(dstypes3.outputs[0], tract, patch))
quanta.append(quantum)
step3nodes = QuantumGraphTaskNodes(step3, quanta)

Expand Down
44 changes: 22 additions & 22 deletions python/lsst/ctrl/mpexec/examples/patchSkyMapTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,32 @@
import logging

from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
InputDatasetField, OutputDatasetField)
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT

_LOG = logging.getLogger(__name__.partition(".")[2])


class PatchSkyMapTaskConfig(PipelineTaskConfig):
coadd = InputDatasetField(name="deepCoadd_calexp",
dimensions=["SkyMap", "Tract", "Patch", "AbstractFilter"],
storageClass="ExposureF",
scalar=True,
doc="DatasetType for the input image")
inputCatalog = InputDatasetField(name="deepCoadd_mergeDet",
dimensions=["SkyMap", "Tract", "Patch"],
storageClass="SourceCatalog",
scalar=True,
doc="DatasetType for the input catalog (merged detections).")
outputCatalog = OutputDatasetField(name="deepCoadd_meas",
dimensions=["SkyMap", "Tract", "Patch", "AbstractFilter"],
storageClass="SourceCatalog",
scalar=True,
doc=("DatasetType for the output catalog "
"(deblended per-band measurements)"))

def setDefaults(self):
# set dimensions of a quantum, this task uses per-tract-patch-filter quanta
self.quantum.dimensions = ["SkyMap", "Tract", "Patch", "AbstractFilter"]
class PatchSkyMapTaskConnections(PipelineTaskConnections,
dimensions=("skymap", "tract", "patch", "abstract_filter")):
coadd = cT.Input(name="deepCoadd_calexp",
dimensions=["skymap", "tract", "patch", "abstract_filter"],
storageClass="ExposureF",
doc="DatasetType for the input image")
inputCatalog = cT.Input(name="deepCoadd_mergeDet",
dimensions=["skymap", "tract", "patch"],
storageClass="SourceCatalog",
doc="DatasetType for the input catalog (merged detections).")
outputCatalog = cT.Output(name="deepCoadd_meas",
dimensions=["skymap", "tract", "patch", "abstract_filter"],
storageClass="SourceCatalog",
doc=("DatasetType for the output catalog "
"(deblended per-band measurements)"))


class PatchSkyMapTaskConfig(PipelineTaskConfig,
pipelineConnections=PatchSkyMapTaskConnections):
pass


class PatchSkyMapTask(PipelineTask):
Expand Down
33 changes: 18 additions & 15 deletions python/lsst/ctrl/mpexec/examples/rawToCalexpTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,28 @@

from lsst.afw.image import ExposureF
from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
InputDatasetField, OutputDatasetField)
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT

_LOG = logging.getLogger(__name__.partition(".")[2])


class RawToCalexpTaskConfig(PipelineTaskConfig):
input = InputDatasetField(name="raw",
dimensions=["instrument", "exposure", "detector"],
storageClass="ExposureU",
doc="Input dataset type for this task")
output = OutputDatasetField(name="calexp",
dimensions=["instrument", "visit", "detector"],
storageClass="ExposureF",
scalar=True,
doc="Output dataset type for this task")

def setDefaults(self):
# set dimensions of a quantum, this task uses per-visit-detector quanta
self.quantum.dimensions = ["instrument", "visit", "detector"]
class RawToCalexpTaskConnections(PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):
input = cT.Input(name="raw",
dimensions=["instrument", "exposure", "detector"],
multiple=True,
storageClass="Exposure",
doc="Input dataset type for this task")
output = cT.Output(name="calexp",
dimensions=["instrument", "visit", "detector"],
storageClass="ExposureF",
doc="Output dataset type for this task")


class RawToCalexpTaskConfig(PipelineTaskConfig,
pipelineConnections=RawToCalexpTaskConnections):
pass


class RawToCalexpTask(PipelineTask):
Expand Down
33 changes: 16 additions & 17 deletions python/lsst/ctrl/mpexec/examples/test1task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@
import logging

from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
InputDatasetField, OutputDatasetField)
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT

_LOG = logging.getLogger(__name__.partition(".")[2])


class Test1Config(PipelineTaskConfig):
input = InputDatasetField(name="input",
dimensions=["Instrument", "Visit"],
storageClass="example",
scalar=True,
doc="Input dataset type for this task")
output = OutputDatasetField(name="output",
dimensions=["Instrument", "Visit"],
storageClass="example",
scalar=True,
doc="Output dataset type for this task")

def setDefaults(self):
# set dimensions of a quantum, this task uses per-visit quanta and it
# expects datset dimensions to be the same
self.quantum.dimensions = ["Instrument", "Visit"]
class Test1Connections(PipelineTaskConnections,
dimensions=("instrument", "visit")):
input = cT.Input(name="input",
dimensions=["instrument", "visit"],
storageClass="example",
doc="Input dataset type for this task")
output = cT.Output(name="output",
dimensions=["instrument", "visit"],
storageClass="example",
doc="Output dataset type for this task")


class Test1Config(PipelineTaskConfig, pipelineConnections=Test1Connections):
pass


class Test1Task(PipelineTask):
Expand Down
34 changes: 17 additions & 17 deletions python/lsst/ctrl/mpexec/examples/test2task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@
import logging

from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
InputDatasetField, OutputDatasetField)
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT

_LOG = logging.getLogger(__name__.partition(".")[2])


class Test2Config(PipelineTaskConfig):
input = InputDatasetField(name="input",
dimensions=["Instrument", "Visit"],
storageClass="example",
doc="Input dataset type for this task")
output = OutputDatasetField(name="output",
dimensions=["Tract", "Patch"],
storageClass="example",
scalar=True,
doc="Output dataset type for this task")

def setDefaults(self):
# this task combines all selected visits into a tract/patch, on
# input it expects per-visit data, on output it produces per-patch.
# Combining visits "destroys" Visit dimension in a quantum.
self.quantum.dimensions = ["Instrument", "Tract", "Patch"]
class Test2Connections(PipelineTaskConnections,
dimensions=("instrument", "tract", "patch")):
input = cT.Input(name="input",
dimensions=["instrument", "visit"],
multiple=True,
storageClass="example",
doc="Input dataset type for this task")
output = cT.Output(name="output",
dimensions=["tract", "patch"],
storageClass="example",
doc="Output dataset type for this task")


class Test2Config(PipelineTaskConfig, pipelineConnections=Test2Connections):
pass


class Test2Task(PipelineTask):
Expand Down

0 comments on commit ae73b45

Please sign in to comment.