Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-11745: Investigate wrapping external function calls in ap_verify #10

Merged
merged 2 commits into from
Dec 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,21 @@ def _getOutputDir(inputDir, outputArg, rerunArg):
return os.path.join(inputDir, "rerun", rerunArg)


def _measureFinalProperties(metadata, outputDir, args, metricsJob):
def _measureFinalProperties(metricsJob, metadata, outputDir, args):
"""Measure any metrics that apply to the final result of the AP pipeline,
rather than to a particular processing stage.

Parameters
----------
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.
metadata : `lsst.daf.base.PropertySet`
The metadata produced by the AP pipeline.
outputDir : `str`
The location of the final processed data repository.
args : `argparse.Namespace`
All command-line arguments passed to this program, including those
supported by `lsst.ap.verify.pipeline_driver.ApPipeParser`.
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.
"""
# TODO: remove this function's dependency on pipeline_driver (possibly after DM-11372)
measurements = []
Expand Down Expand Up @@ -191,5 +191,5 @@ def runApVerify(cmdLine=None):

with AutoJob(args) as job:
log.info('Running pipeline...')
metadata = runApPipe(testData, output, args, job)
_measureFinalProperties(metadata, output, args, job)
metadata = runApPipe(job, testData, output, args)
_measureFinalProperties(job, metadata, output, args)
146 changes: 70 additions & 76 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@
# see <http://www.lsstcorp.org/LegalNotices/>.
#

"""Interface between `ap_verify` and `ap_pipe`.

This module handles calling `ap_pipe` and converting any information
as needed. It also attempts to collect measurements step-by-step, so
that a total pipeline failure still allows some measurements to be
recovered.
"""

from __future__ import absolute_import, division, print_function

__all__ = ["ApPipeParser", "MeasurementStorageError", "runApPipe"]

import argparse
from functools import wraps
from future.utils import raise_from

import json
Expand Down Expand Up @@ -69,9 +78,8 @@ def _updateMetrics(metadata, job):
Parameters
----------
metadata : `lsst.daf.base.PropertySet`
The metadata from running a task(s). No action taken if `None`.
Assumed to contain keys of the form
"<standard task prefix>.verify_json_path" that maps to the
The full metadata from running a task(s). Assumed to contain keys of
the form "<standard task prefix>.verify_json_path" that maps to the
absolute file location of that task's serialized measurements.
All other metadata fields are ignored.
job : `lsst.verify.Job`
Expand All @@ -84,8 +92,6 @@ def _updateMetrics(metadata, job):
A "verify_json_path" key does not map to a string, or serialized
measurements could not be located or read from disk.
"""
if metadata is None:
return
try:
keys = metadata.names(topLevelOnly=False)
files = [metadata.getAsString(key) for key in keys if key.endswith('verify_json_path')]
Expand All @@ -100,7 +106,41 @@ def _updateMetrics(metadata, job):
e)


def _ingestRaws(dataset, workingRepo, metricsJob):
def _MetricsRecovery(pipelineStep):
"""Carry out a pipeline step while handling metrics defensively.

Parameters
----------
pipelineStep: callable
The pipeline step to decorate. Must return metadata from the task(s)
executed, or `None`.

Returns
-------
A callable that expects a `verify.Job` as its first parameter,
followed by the arguments to `pipelineStep`, in order. Its
behavior shall be to execute `pipelineStep`, update the `Job`
object with any metrics produced by `pipelineStep`, and return
(possibly empty) metadata.

The returned callable shall raise `pipeline.MeasurementStorageError`
if measurements were made, but the `Job` object could not be
updated with them. Any side effects of `pipelineStep` shall
remain in effect in the event of this exception.
"""
@wraps(pipelineStep)
def wrapper(job, *args, **kwargs):
metadata = pipelineStep(*args, **kwargs)
if metadata is None:
metadata = dafBase.PropertySet()

_updateMetrics(metadata, job)
return metadata
return wrapper


@_MetricsRecovery
def _ingestRaws(dataset, workingRepo):
"""Ingest the raw data for use by LSST.

The original data directory shall not be modified.
Expand All @@ -112,26 +152,17 @@ def _ingestRaws(dataset, workingRepo, metricsJob):
workingRepo : `str`
The repository in which temporary products will be created. Must be
compatible with `dataset`.
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
The metadata from any tasks called by this method. May be empty.

Raises
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with them.
The full metadata from any Tasks called by this method, or `None`.
"""
metadata = apPipe.doIngest(workingRepo, dataset.rawLocation, dataset.refcatsLocation)
_updateMetrics(metadata, metricsJob)
return metadata
return apPipe.doIngest(workingRepo, dataset.rawLocation, dataset.refcatsLocation)


def _ingestCalibs(dataset, workingRepo, metricsJob):
@_MetricsRecovery
def _ingestCalibs(dataset, workingRepo):
"""Ingest the raw calibrations for use by LSST.

The original calibration directory shall not be modified.
Expand All @@ -143,26 +174,17 @@ def _ingestCalibs(dataset, workingRepo, metricsJob):
workingRepo : `str`
The repository in which temporary products will be created. Must be
compatible with `dataset`.
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
The metadata from any tasks called by this method. May be empty.

Raises
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with them.
The full metadata from any Tasks called by this method, or `None`.
"""
metadata = apPipe.doIngestCalibs(workingRepo, dataset.calibLocation, dataset.defectLocation)
_updateMetrics(metadata, metricsJob)
return metadata
return apPipe.doIngestCalibs(workingRepo, dataset.calibLocation, dataset.defectLocation)


def _process(workingRepo, dataId, parallelization, metricsJob):
@_MetricsRecovery
def _process(workingRepo, dataId, parallelization):
"""Run single-frame processing on a dataset.

Parameters
Expand All @@ -174,26 +196,17 @@ def _process(workingRepo, dataId, parallelization, metricsJob):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
The metadata from any tasks called by this method. May be empty.

Raises
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with them.
The full metadata from any Tasks called by this method, or `None`.
"""
metadata = apPipe.doProcessCcd(workingRepo, dataId)
_updateMetrics(metadata, metricsJob)
return metadata
return apPipe.doProcessCcd(workingRepo, dataId)


def _difference(dataset, workingRepo, dataId, parallelization, metricsJob):
@_MetricsRecovery
def _difference(dataset, workingRepo, dataId, parallelization):
"""Run image differencing on a dataset.

Parameters
Expand All @@ -207,26 +220,17 @@ def _difference(dataset, workingRepo, dataId, parallelization, metricsJob):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
The metadata from any tasks called by this method. May be empty.

Raises
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with them.
The full metadata from any Tasks called by this method, or `None`.
"""
metadata = apPipe.doDiffIm(workingRepo, dataset.templateLocation, dataId)
_updateMetrics(metadata, metricsJob)
return metadata
return apPipe.doDiffIm(workingRepo, dataset.templateLocation, dataId)


def _associate(workingRepo, dataId, parallelization, metricsJob):
@_MetricsRecovery
def _associate(workingRepo, dataId, parallelization):
"""Run source association on a dataset.

Parameters
Expand All @@ -238,23 +242,13 @@ def _associate(workingRepo, dataId, parallelization, metricsJob):
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
metricsJob : `lsst.verify.Job`
The Job object to which to add any metric measurements made.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
The metadata from any tasks called by this method. May be empty.

Raises
------
`lsst.ap.verify.pipeline_driver.MeasurementStorageError`
Measurements were made, but `metricsJob` could not be updated
with them.
The full metadata from any Tasks called by this method, or `None`.
"""
metadata = apPipe.doAssociation(workingRepo, dataId)
_updateMetrics(metadata, metricsJob)
return metadata
return apPipe.doAssociation(workingRepo, dataId)


def _postProcess(workingRepo):
Expand All @@ -270,7 +264,7 @@ def _postProcess(workingRepo):
pass


def runApPipe(dataset, workingRepo, parsedCmdLine, metricsJob):
def runApPipe(metricsJob, dataset, workingRepo, parsedCmdLine):
"""Run `ap_pipe` on this object's dataset.

Parameters
Expand Down Expand Up @@ -300,19 +294,19 @@ def runApPipe(dataset, workingRepo, parsedCmdLine, metricsJob):

# Easiest way to defend against None return values
metadata = dafBase.PropertySet()
metadata.combine(_ingestRaws(dataset, workingRepo, metricsJob))
metadata.combine(_ingestCalibs(dataset, workingRepo, metricsJob))
metadata.combine(_ingestRaws(metricsJob, dataset, workingRepo))
metadata.combine(_ingestCalibs(metricsJob, dataset, workingRepo))
_getApPipeRepos(metadata)
log.info('Data ingested')

dataId = parsedCmdLine.dataId
processes = parsedCmdLine.processes
metadata.combine(_process(workingRepo, dataId, processes, metricsJob))
metadata.combine(_process(metricsJob, workingRepo, dataId, processes))
log.info('Single-frame processing complete')

metadata.combine(_difference(dataset, workingRepo, dataId, processes, metricsJob))
metadata.combine(_difference(metricsJob, dataset, workingRepo, dataId, processes))
log.info('Image differencing complete')
metadata.combine(_associate(workingRepo, dataId, processes, metricsJob))
metadata.combine(_associate(metricsJob, workingRepo, dataId, processes))
log.info('Source association complete')

_postProcess(workingRepo)
Expand Down