Skip to content

Commit

Permalink
Add support for lsstDebug facility (DM-22504)
Browse files Browse the repository at this point in the history
The --debug option existed but did not do anyhting, added code that
imports `debug.py` when that option is set. Compared to CmdLineTask in
`pipetask` I do it in two places, once before PreExecInit (in case
someone needs to debug task construction) and once before running each
task. Latter is not needed if `multiprocess` uses 'fork' start method
but it is impossible to guarantee that is always true on all platforms.
  • Loading branch information
andy-slac committed Dec 13, 2019
1 parent 6230577 commit fb2097b
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 11 deletions.
14 changes: 13 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
if not butler.run:
raise ValueError("no output collection defined in data butler")

# Enable lsstDebug debugging. Note that this is done once in the
# main process before PreExecInit and it is also repeated before
# running each task in SingleQuantumExecutor (which may not be
# needed if `multipocessing` always uses fork start method).
if args.enableLsstDebug:
try:
_LOG.debug("Will try to import debug.py")
import debug # noqa:F401
except ImportError:
_LOG.warn("No 'debug' module found.")

preExecInit = PreExecInit(butler, taskFactory, args.skip_existing, args.clobber_output)
preExecInit.initialize(graph,
saveInitOutputs=not args.skip_init_writes,
Expand All @@ -366,7 +377,8 @@ def runPipeline(self, graph, taskFactory, args, butler=None):
if not args.init_only:
executor = MPGraphExecutor(numProc=args.processes, timeout=self.MP_TIMEOUT,
skipExisting=args.skip_existing,
clobberOutput=args.clobber_output)
clobberOutput=args.clobber_output,
enableLsstDebug=args.enableLsstDebug)
with util.profile(args.profile, _LOG):
executor.execute(graph, butler, taskFactory)

Expand Down
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ def _makeLoggingOptions(parser):
help="logging level; supported levels are [trace|debug|info|warn|error|fatal]",
metavar="LEVEL|COMPONENT=LEVEL")
group.add_argument("--longlog", action="store_true", help="use a more verbose format for the logging")
group.add_argument("--debug", action="store_true", help="(deprecated) enable debugging output")
group.add_argument("--debug", action="store_true", dest="enableLsstDebug",
help="enable debugging output using lsstDebug facility (imports debug.py)")


def _makePipelineOptions(parser):
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/examples/calexpToCoaddTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from lsst.pipe.base import (Struct, PipelineTask, PipelineTaskConfig,
PipelineTaskConnections)
from lsst.pipe.base import connectionTypes as cT
import lsstDebug

_LOG = logging.getLogger(__name__.partition(".")[2])

Expand Down Expand Up @@ -43,6 +44,15 @@ def run(self, calexp):
"""
_LOG.info("executing %s: calexp=%s", self.getName(), calexp)

# To test lsstDebug function make a debug.py file with this contents
# somewhere in PYTHONPATH and run `pipetask` with --debug option:
#
# import lsstDebug
# lsstDebug.Info('lsst.ctrl.mpexec.examples.calexpToCoaddTask').display = True
#
if lsstDebug.Info(__name__).display:
_LOG.info("%s: display enabled", __name__)

# output data, scalar in this case
data = ExposureF(100, 100)

Expand Down
18 changes: 13 additions & 5 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ class MPGraphExecutor(QuantumGraphExecutor):
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
"""
def __init__(self, numProc, timeout, skipExisting=False, clobberOutput=False):
def __init__(self, numProc, timeout, skipExisting=False, clobberOutput=False, enableLsstDebug=False):
self.numProc = numProc
self.timeout = timeout
self.skipExisting = skipExisting
self.clobberOutput = clobberOutput
self.enableLsstDebug = enableLsstDebug

def execute(self, graph, butler, taskFactory):
# Docstring inherited from QuantumGraphExecutor.execute
Expand All @@ -90,7 +93,8 @@ def _executeQuantaInProcess(self, iterable, butler, taskFactory):
taskDef = qdata.taskDef
self._executePipelineTask(taskDef=taskDef, quantum=qdata.quantum, butler=butler,
taskFactory=taskFactory, skipExisting=self.skipExisting,
clobberOutput=self.clobberOutput)
clobberOutput=self.clobberOutput,
enableLsstDebug=self.enableLsstDebug)

def _executeQuantaMP(self, iterable, butler, taskFactory):
"""Execute all Quanta in separate process pool.
Expand Down Expand Up @@ -135,7 +139,8 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):
# Add it to the pool and remember its result
_LOG.debug("Sumbitting %s", qdata)
kwargs = dict(taskDef=taskDef, quantum=qdata.quantum, butler=butler, taskFactory=taskFactory,
skipExisting=self.skipExisting, clobberOutput=self.clobberOutput)
skipExisting=self.skipExisting, clobberOutput=self.clobberOutput,
enableLsstDebug=self.enableLsstDebug)
results[qdata.index] = pool.apply_async(self._executePipelineTask, (), kwargs)

# Everything is submitted, wait until it's complete
Expand All @@ -148,7 +153,8 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):
res.get(self.timeout)

@staticmethod
def _executePipelineTask(*, taskDef, quantum, butler, taskFactory, skipExisting, clobberOutput):
def _executePipelineTask(*, taskDef, quantum, butler, taskFactory, skipExisting,
clobberOutput, enableLsstDebug):
"""Execute PipelineTask on a single data item.
Parameters
Expand All @@ -166,6 +172,8 @@ def _executePipelineTask(*, taskDef, quantum, butler, taskFactory, skipExisting,
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
"""
executor = SingleQuantumExecutor(butler, taskFactory, skipExisting, clobberOutput)
executor = SingleQuantumExecutor(butler, taskFactory, skipExisting, clobberOutput, enableLsstDebug)
return executor.execute(taskDef, quantum)
14 changes: 13 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ class SingleQuantumExecutor:
clobberOutput : `bool`, optional
It `True` then override all existing output datasets in an output
collection.
enableLsstDebug : `bool`, optional
Enable debugging with ``lsstDebug`` facility for a task.
"""
def __init__(self, butler, taskFactory, skipExisting=False, clobberOutput=False):
def __init__(self, butler, taskFactory, skipExisting=False, clobberOutput=False, enableLsstDebug=False):
self.butler = butler
self.taskFactory = taskFactory
self.skipExisting = skipExisting
self.clobberOutput = clobberOutput
self.enableLsstDebug = enableLsstDebug

def execute(self, taskDef, quantum):
"""Execute PipelineTask on a single Quantum.
Expand All @@ -80,6 +83,15 @@ def execute(self, taskDef, quantum):
f"task={taskClass.__name__} dataId={quantum.dataId}.")
return
self.updateQuantumInputs(quantum)

# enable lsstDebug debugging
if self.enableLsstDebug:
try:
_LOG.debug("Will try to import debug.py")
import debug # noqa:F401
except ImportError:
_LOG.warn("No 'debug' module found.")

task = self.makeTask(taskClass, config)
self.runQuantum(task, quantum, taskDef)

Expand Down
1 change: 1 addition & 0 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def _makeArgs(pipeline=None, qgraph=None, pipeline_actions=(), order_pipeline=Fa
args.init_only = False
args.processes = 1
args.profile = None
args.enableLsstDebug = False
return args


Expand Down
6 changes: 3 additions & 3 deletions tests/test_cmdLineParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def testCmdLineParser(self):
self.assertRaises(_Error, parser.parse_args)

# know attributes to appear in parser output for all subcommands
common_options = "loglevel longlog debug subcommand subparser".split()
common_options = "loglevel longlog enableLsstDebug subcommand subparser".split()

# test for the set of options defined in each command
args = parser.parse_args(
Expand Down Expand Up @@ -199,7 +199,7 @@ def testCmdLineTasks(self):
""".split())
self.assertFalse(args.clobberConfig)
self.assertFalse(args.clobberVersions)
self.assertFalse(args.debug)
self.assertFalse(args.enableLsstDebug)
self.assertFalse(args.doraise)
self.assertEqual(args.input, {})
self.assertEqual(args.loglevel, [])
Expand Down Expand Up @@ -243,7 +243,7 @@ def testCmdLineTasks(self):
""".split())
self.assertTrue(args.clobberConfig)
self.assertTrue(args.clobberVersions)
self.assertTrue(args.debug)
self.assertTrue(args.enableLsstDebug)
self.assertTrue(args.doraise)
self.assertEqual(args.input, {"": ["inputColl"]})
self.assertEqual(args.loglevel, [(None, 'DEBUG'), ('component', 'TRACE')])
Expand Down

0 comments on commit fb2097b

Please sign in to comment.