Skip to content

Commit

Permalink
Change log setup to a context manager
Browse files Browse the repository at this point in the history
This simplifies the logic for ensuring that logs are always
written to the butler.
  • Loading branch information
timj committed Jul 25, 2021
1 parent 8aa7ef8 commit 68cdca6
Showing 1 changed file with 81 additions and 65 deletions.
146 changes: 81 additions & 65 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sys
import tempfile
import time
from contextlib import contextmanager
from collections import defaultdict
from itertools import chain
from logging import FileHandler
Expand Down Expand Up @@ -103,86 +104,95 @@ def __init__(self, taskFactory, skipExisting=False, clobberOutputs=False, enable
self.log_handler = None

def execute(self, taskDef, quantum, butler):

# Docstring inherited from QuantumExecutor.execute
startTime = time.time()

# Save detailed resource usage before task start to metadata.
quantumMetadata = PropertyList()
logInfo(None, "prep", metadata=quantumMetadata)
with self.captureLogging(taskDef, quantum, butler):
# Save detailed resource usage before task start to metadata.
quantumMetadata = PropertyList()
logInfo(None, "prep", metadata=quantumMetadata)

# Docstring inherited from QuantumExecutor.execute
self.setupLogging(taskDef, quantum)
taskClass, label, config = taskDef.taskClass, taskDef.label, taskDef.config
taskClass, label, config = taskDef.taskClass, taskDef.label, taskDef.config

# check whether to skip or delete old outputs
if self.checkExistingOutputs(quantum, butler, taskDef):
_LOG.info("Skipping already-successful quantum for label=%s dataId=%s.", label, quantum.dataId)
self.writeLogRecords(quantum, taskDef, butler)
return
try:
quantum = self.updatedQuantumInputs(quantum, butler, taskDef)
except NoWorkFound as exc:
_LOG.info("Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s",
taskDef.label, quantum.dataId, str(exc))
# Make empty metadata that looks something like what a do-nothing
# task would write (but we don't bother with empty nested
# PropertySets for subtasks). This is slightly duplicative with
# logic in pipe_base that we can't easily call from here; we'll fix
# this on DM-29761.
# check whether to skip or delete old outputs
if self.checkExistingOutputs(quantum, butler, taskDef):
_LOG.info("Skipping already-successful quantum for label=%s dataId=%s.", label,
quantum.dataId)
return
try:
quantum = self.updatedQuantumInputs(quantum, butler, taskDef)
except NoWorkFound as exc:
_LOG.info("Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s",
taskDef.label, quantum.dataId, str(exc))
# Make empty metadata that looks something like what a
# do-nothing task would write (but we don't bother with empty
# nested PropertySets for subtasks). This is slightly
# duplicative with logic in pipe_base that we can't easily call
# from here; we'll fix this on DM-29761.
logInfo(None, "end", metadata=quantumMetadata)
fullMetadata = PropertySet()
fullMetadata[taskDef.label] = PropertyList()
fullMetadata["quantum"] = quantumMetadata
self.writeMetadata(quantum, fullMetadata, taskDef, butler)
return

# enable lsstDebug debugging
if self.enableLsstDebug:
try:
_LOG.debug("Will try to import debug.py")
import debug # noqa:F401
except ImportError:
_LOG.warn("No 'debug' module found.")

# initialize global state
self.initGlobals(quantum, butler)

# Ensure that we are executing a frozen config
config.freeze()
logInfo(None, "init", metadata=quantumMetadata)
task = self.makeTask(taskClass, label, config, butler)
logInfo(None, "start", metadata=quantumMetadata)
try:
self.runQuantum(task, quantum, taskDef, butler)
except Exception:
_LOG.exception("Execution of task '%s' on quantum %s failed",
taskDef.label, quantum.dataId)
raise
logInfo(None, "end", metadata=quantumMetadata)
fullMetadata = PropertySet()
fullMetadata[taskDef.label] = PropertyList()
fullMetadata = task.getFullMetadata()
fullMetadata["quantum"] = quantumMetadata
self.writeMetadata(quantum, fullMetadata, taskDef, butler)
self.writeLogRecords(quantum, taskDef, butler)
return

# enable lsstDebug debugging
if self.enableLsstDebug:
try:
_LOG.debug("Will try to import debug.py")
import debug # noqa:F401
except ImportError:
_LOG.warn("No 'debug' module found.")

# initialize global state
self.initGlobals(quantum, butler)

# Ensure that we are executing a frozen config
config.freeze()
logInfo(None, "init", metadata=quantumMetadata)
task = self.makeTask(taskClass, label, config, butler)
logInfo(None, "start", metadata=quantumMetadata)
try:
self.runQuantum(task, quantum, taskDef, butler)
except Exception:
_LOG.exception("Execution of task '%s' on quantum %s failed",
taskDef.label, quantum.dataId)
self.writeLogRecords(quantum, taskDef, butler)
raise
logInfo(None, "end", metadata=quantumMetadata)
fullMetadata = task.getFullMetadata()
fullMetadata["quantum"] = quantumMetadata
self.writeMetadata(quantum, fullMetadata, taskDef, butler)
stopTime = time.time()
_LOG.info("Execution of task '%s' on quantum %s took %.3f seconds",
taskDef.label, quantum.dataId, stopTime - startTime)

self.writeLogRecords(quantum, taskDef, butler)
stopTime = time.time()
_LOG.info("Execution of task '%s' on quantum %s took %.3f seconds",
taskDef.label, quantum.dataId, stopTime - startTime)

def setupLogging(self, taskDef, quantum):
"""Configure logging system for execution of this task.
Ths method can setup logging to attach task- or
quantum-specific information to log messages. Potentially this can
take into account some info from task configuration as well.
@contextmanager
def captureLogging(self, taskDef, quantum, butler):
"""Configure logging system to capture logs for execution of this task.
Parameters
----------
taskDef : `lsst.pipe.base.TaskDef`
The task definition.
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.
butler : `~lsst.daf.butler.Butler`
Butler to write logs to.
Notes
-----
Expected to be used as a context manager to ensure that logging
records are inserted into the butler once the quantum has been
executed:
.. code-block:: py
with self.captureLogging(taskDef, quantum):
# Run quantum and capture logs.
Ths method can also setup logging to attach task- or
quantum-specific information to log messages. Potentially this can
take into account some info from task configuration as well.
"""
# include quantum dataId and task label into MDC
label = taskDef.label
Expand All @@ -209,6 +219,12 @@ def setupLogging(self, taskDef, quantum):

logging.getLogger().addHandler(self.log_handler)

try:
yield
finally:
# Ensure that the logs are stored in butler.
self.writeLogRecords(quantum, taskDef, butler)

def checkExistingOutputs(self, quantum, butler, taskDef):
"""Decide whether this quantum needs to be executed.
Expand Down

0 comments on commit 68cdca6

Please sign in to comment.