Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31255: Fix log-to-butler failures with --skip-existing #132

Merged
merged 2 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 59 additions & 33 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Imports of standard modules --
# -------------------------------
import logging
import os
import sys
import tempfile
import time
Expand Down Expand Up @@ -57,6 +58,7 @@
)
from lsst.daf.butler.core.logging import (
ButlerLogRecordHandler,
ButlerLogRecords,
ButlerMDC,
JsonLogFormatter,
)
Expand All @@ -67,6 +69,12 @@
_LOG = logging.getLogger(__name__.partition(".")[2])


class _LogCaptureFlag:
"""Simple flag to enable/disable log-to-butler saving.
"""
store: bool = True


class SingleQuantumExecutor(QuantumExecutor):
"""Executor class which runs one Quantum at a time.

Expand Down Expand Up @@ -109,18 +117,24 @@ def execute(self, taskDef, quantum, butler):
# Docstring inherited from QuantumExecutor.execute
startTime = time.time()

with self.captureLogging(taskDef, quantum, butler):
with self.captureLogging(taskDef, quantum, butler) as captureLog:

# Save detailed resource usage before task start to metadata.
quantumMetadata = PropertyList()
logInfo(None, "prep", metadata=quantumMetadata)

taskClass, label, config = taskDef.taskClass, taskDef.label, taskDef.config

# check whether to skip or delete old outputs
# check whether to skip or delete old outputs, if it returns True
# or raises an exception do not try to store logs, as they may be
# already in butler.
captureLog.store = False
if self.checkExistingOutputs(quantum, butler, taskDef):
_LOG.info("Skipping already-successful quantum for label=%s dataId=%s.", label,
quantum.dataId)
return
captureLog.store = True

try:
quantum = self.updatedQuantumInputs(quantum, butler, taskDef)
except NoWorkFound as exc:
Expand Down Expand Up @@ -219,12 +233,13 @@ def captureLogging(self, taskDef, quantum, butler):
if quantum.dataId:
label += f":{quantum.dataId}"

ctx = _LogCaptureFlag()
try:
with ButlerMDC.set_mdc({"LABEL": label}):
yield
yield ctx
finally:
# Ensure that the logs are stored in butler.
self.writeLogRecords(quantum, taskDef, butler)
self.writeLogRecords(quantum, taskDef, butler, ctx.store)

def checkExistingOutputs(self, quantum, butler, taskDef):
"""Decide whether this quantum needs to be executed.
Expand Down Expand Up @@ -438,7 +453,7 @@ def writeMetadata(self, quantum, metadata, taskDef, butler):
f" and execution") from exc
butler.put(metadata, ref[0])

def writeLogRecords(self, quantum, taskDef, butler):
def writeLogRecords(self, quantum, taskDef, butler, store):
# If we are logging to an external file we must always try to
# close it.
filename = None
Expand All @@ -450,35 +465,46 @@ def writeLogRecords(self, quantum, taskDef, butler):
# Remove the handler so we stop accumulating log messages.
logging.getLogger().removeHandler(self.log_handler)

if taskDef.logOutputDatasetName is not None and self.log_handler is not None:
# DatasetRef has to be in the Quantum outputs, can lookup by name
try:
ref = quantum.outputs[taskDef.logOutputDatasetName]
except LookupError as exc:
raise InvalidQuantumError(
f"Quantum outputs is missing log output dataset type {taskDef.logOutputDatasetName};"
f" this could happen due to inconsistent options between QuantumGraph generation"
f" and execution") from exc

if isinstance(self.log_handler, ButlerLogRecordHandler):
butler.put(self.log_handler.records, ref[0])

# Clear the records in case the handler is reused.
self.log_handler.records.clear()
else:
assert filename is not None, "Somehow unable to extract filename from file handler"

# Need to ingest this file directly into butler.
dataset = FileDataset(path=filename, refs=ref[0])
try:
if store and taskDef.logOutputDatasetName is not None and self.log_handler is not None:
# DatasetRef has to be in the Quantum outputs, can lookup by
# name
try:
ref = quantum.outputs[taskDef.logOutputDatasetName]
except LookupError as exc:
raise InvalidQuantumError(
f"Quantum outputs is missing log output dataset type {taskDef.logOutputDatasetName};"
f" this could happen due to inconsistent options between QuantumGraph generation"
f" and execution") from exc

if isinstance(self.log_handler, ButlerLogRecordHandler):
butler.put(self.log_handler.records, ref[0])

# Clear the records in case the handler is reused.
self.log_handler.records.clear()
else:
assert filename is not None, "Somehow unable to extract filename from file handler"

# Need to ingest this file directly into butler.
dataset = FileDataset(path=filename, refs=ref[0])
try:
butler.ingest(dataset, transfer="move")
filename = None
except NotImplementedError:
# Some datastores can't receive files (e.g. in-memory
# datastore when testing), we store empty list for
# those just to have a dataset. Alternative is to read
# the file as a ButlerLogRecords object and put it.
_LOG.info("Log records could not be stored in this butler because the"
" datastore can not ingest files, empty record list is stored instead.")
records = ButlerLogRecords.from_records([])
butler.put(records, ref[0])
finally:
# remove file if it is not ingested
if filename is not None:
Copy link
Member

@timj timj Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about:

if store and other tests:
    # store it

if filename:
    # remove it

would work wouldn't it? Then there would not be the os.remove code duplication logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work, I'll change that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also case when ingest fails with something else than NotImplementedError, I think we also need to cleanup in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New commit should fix all problems 🙂

try:
butler.ingest(dataset, transfer="move")
except NotImplementedError:
# Some datastores can't receive files (e.g. in-memory
# datastore when testing) so skip log storage for those.
# Alternative is to read the file as a ButlerLogRecords
# object and put it.
_LOG.info("Log records could not be stored in this butler because the"
" datastore can not ingest files.")
os.remove(filename)
except OSError:
pass

def initGlobals(self, quantum, butler):
Expand Down
14 changes: 10 additions & 4 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def testSimpleQGraphSkipExisting(self):
"""

nQuanta = 5
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root)
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, inMemory=False)

self.assertEqual(len(qgraph.taskGraph), 5)
self.assertEqual(len(qgraph), nQuanta)
Expand All @@ -425,6 +425,12 @@ def testSimpleQGraphSkipExisting(self):
fwk.runPipeline(qgraph, taskFactory, args, butler=butler)
self.assertEqual(taskFactory.countExec, 3)

# Failed task still makes _log dataset, have to delete this before
# retry if not using clobber-outputs.
ref = butler.registry.findDataset("task3_log", instrument="INSTR", detector=0)
self.assertIsNotNone(ref)
butler.pruneDatasets([ref], disassociate=True, unstore=True, purge=True)

# run remaining ones
taskFactory.stopAt = -1
args.skip_existing = True
Expand All @@ -439,7 +445,7 @@ def testSimpleQGraphOutputsFail(self):
"""

nQuanta = 5
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root)
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, inMemory=False)

# should have one task and number of quanta
self.assertEqual(len(qgraph), nQuanta)
Expand All @@ -455,7 +461,7 @@ def testSimpleQGraphOutputsFail(self):

# drop one of the two outputs from one task
ref1 = butler.registry.findDataset("add2_dataset2", instrument="INSTR", detector=0)
self.assertIsNotNone([ref1])
self.assertIsNotNone(ref1)
# also drop the metadata output
ref2 = butler.registry.findDataset("task1_metadata", instrument="INSTR", detector=0)
self.assertIsNotNone(ref2)
Expand All @@ -475,7 +481,7 @@ def testSimpleQGraphClobberOutputs(self):
"""

nQuanta = 5
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root)
butler, qgraph = makeSimpleQGraph(nQuanta, root=self.root, inMemory=False)

# should have one task and number of quanta
self.assertEqual(len(qgraph), nQuanta)
Expand Down