Skip to content

Commit

Permalink
Modify metrics and error handling.
Browse files Browse the repository at this point in the history
The new code imports metrics on exit or failure instead of after each
pipeline step, allowing us to, for the moment, query the task object
for metadata. This may need to be reworked depending on how parallel
processing is handled.

This commit makes one behavior change, in that it now tries to extract
metrics from the failed pipeline step. If this process raises an
exception, it will suppress any exception raised by the pipeline. This
is an acceptable cost for the time being.
  • Loading branch information
kfindeisen committed Mar 22, 2018
1 parent 58e041f commit f723aaa
Showing 1 changed file with 20 additions and 71 deletions.
91 changes: 20 additions & 71 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
import argparse
import os
import re
from functools import wraps
from future.utils import raise_from

import json

import lsst.log
import lsst.daf.base as dafBase
import lsst.ap.pipe as apPipe
from lsst.verify import Job

Expand Down Expand Up @@ -109,40 +107,6 @@ def _updateMetrics(metadata, job):
e)


def _MetricsRecovery(pipelineStep):
"""Carry out a pipeline step while handling metrics defensively.
Parameters
----------
pipelineStep: callable
The pipeline step to decorate. Must return metadata from the task(s)
executed, or `None`.
Returns
-------
A callable that expects a `verify.Job` as its first parameter,
followed by the arguments to `pipelineStep`, in order. Its
behavior shall be to execute `pipelineStep`, update the `Job`
object with any metrics produced by `pipelineStep`, and return
(possibly empty) metadata.
The returned callable shall raise `pipeline.MeasurementStorageError`
if measurements were made, but the `Job` object could not be
updated with them. Any side effects of `pipelineStep` shall
remain in effect in the event of this exception.
"""
@wraps(pipelineStep)
def wrapper(job, *args, **kwargs):
metadata = pipelineStep(*args, **kwargs)
if metadata is None:
metadata = dafBase.PropertySet()

_updateMetrics(metadata, job)
return metadata
return wrapper


@_MetricsRecovery
def _process(pipeline, workspace, dataId, parallelization):
"""Run single-frame processing on a dataset.
Expand All @@ -157,17 +121,11 @@ def _process(pipeline, workspace, dataId, parallelization):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
Returns
-------
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
dataRef = workspace.workButler.dataRef('raw', **dataId)
return pipeline.runProcessCcd(dataRef).fullMetadata
pipeline.runProcessCcd(dataRef)


@_MetricsRecovery
def _difference(pipeline, workspace, dataId, parallelization):
"""Run image differencing on a dataset.
Expand All @@ -182,17 +140,11 @@ def _difference(pipeline, workspace, dataId, parallelization):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
Returns
-------
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
dataRef = workspace.workButler.dataRef('calexp', **dataId)
return pipeline.runDiffIm(dataRef).fullMetadata
pipeline.runDiffIm(dataRef)


@_MetricsRecovery
def _associate(pipeline, workspace, dataId, parallelization):
"""Run source association on a dataset.
Expand All @@ -207,14 +159,9 @@ def _associate(pipeline, workspace, dataId, parallelization):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
Returns
-------
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
dataRef = workspace.workButler.dataRef('calexp', **dataId)
return pipeline.runDiffIm(dataRef).fullMetadata
pipeline.runDiffIm(dataRef)


def _postProcess(workspace):
Expand Down Expand Up @@ -251,29 +198,31 @@ def runApPipe(metricsJob, workspace, parsedCmdLine):
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with all of them.
with all of them. This exception may suppress exceptions raised by
the pipeline itself.
"""
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipe')

metadata = dafBase.PropertySet()

dataId = _parseDataId(parsedCmdLine.dataId)
processes = parsedCmdLine.processes

pipeline = apPipe.ApPipeTask(workspace.workButler,
os.path.join(workspace.outputRepo, 'association.db'))

metadata.combine(_process(metricsJob, pipeline, workspace, dataId, processes))
log.info('Single-frame processing complete')

metadata.combine(_difference(metricsJob, pipeline, workspace, dataId, processes))
log.info('Image differencing complete')
metadata.combine(_associate(metricsJob, pipeline, workspace, dataId, processes))
log.info('Source association complete')

_postProcess(workspace)
log.info('Pipeline complete')
return metadata
try:
_process(pipeline, workspace, dataId, processes)
log.info('Single-frame processing complete')

_difference(pipeline, workspace, dataId, processes)
log.info('Image differencing complete')
_associate(pipeline, workspace, dataId, processes)
log.info('Source association complete')

_postProcess(workspace)
log.info('Pipeline complete')
return pipeline.getFullMetadata()
finally:
# Recover any metrics from completed pipeline steps, even if the pipeline fails
_updateMetrics(pipeline.getFullMetadata(), metricsJob)


def _deStringDataId(dataId):
Expand Down

0 comments on commit f723aaa

Please sign in to comment.