Skip to content

Commit

Permalink
Call new ap_pipe API.
Browse files Browse the repository at this point in the history
These are the minimal changes needed to make ap_verify work with
ap_pipe again. Optimization of pipeline_driver will happen on a
different ticket.
  • Loading branch information
kfindeisen committed Feb 28, 2018
1 parent 01e1148 commit 953f110
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
49 changes: 27 additions & 22 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
__all__ = ["ApPipeParser", "MeasurementStorageError", "runApPipe"]

import argparse
import os
import re
from functools import wraps
from future.utils import raise_from
Expand Down Expand Up @@ -142,14 +143,16 @@ def wrapper(job, *args, **kwargs):


@_MetricsRecovery
def _process(workspace, dataId, parallelization):
def _process(pipeline, workspace, dataId, parallelization):
"""Run single-frame processing on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing input and output repositories.
dataId : `str`
dataId : `dict` of any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Expand All @@ -160,22 +163,21 @@ def _process(workspace, dataId, parallelization):
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
return apPipe.doProcessCcd(workspace.dataRepo,
workspace.calibRepo,
workspace.outputRepo,
dataId,
skip=False)
dataRef = workspace.workButler.dataRef('raw', **dataId)
return pipeline.runProcessCcd(dataRef).fullMetadata


@_MetricsRecovery
def _difference(workspace, dataId, parallelization):
def _difference(pipeline, workspace, dataId, parallelization):
"""Run image differencing on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing input and output repositories.
dataId : `str`
dataId : `dict` of any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Expand All @@ -186,23 +188,21 @@ def _difference(workspace, dataId, parallelization):
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
return apPipe.doDiffIm(workspace.outputRepo,
dataId,
'coadd',
workspace.templateRepo,
workspace.outputRepo,
skip=False)
dataRef = workspace.workButler.dataRef('calexp', **dataId)
return pipeline.runDiffIm(dataRef).fullMetadata


@_MetricsRecovery
def _associate(workspace, dataId, parallelization):
def _associate(pipeline, workspace, dataId, parallelization):
"""Run source association on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing output repositories.
dataId : `str`
dataId : `dict` of any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Expand All @@ -213,7 +213,8 @@ def _associate(workspace, dataId, parallelization):
metadata : `lsst.daf.base.PropertySet`
The full metadata from any Tasks called by this method, or `None`.
"""
return apPipe.doAssociation(workspace.outputRepo, dataId, workspace.outputRepo, skip=False)
dataRef = workspace.workButler.dataRef('calexp', **dataId)
return pipeline.runDiffIm(dataRef).fullMetadata


def _postProcess(workspace):
Expand Down Expand Up @@ -256,14 +257,18 @@ def runApPipe(metricsJob, workspace, parsedCmdLine):

metadata = dafBase.PropertySet()

dataId = parsedCmdLine.dataId
dataId = _parseDataId(parsedCmdLine.dataId)
processes = parsedCmdLine.processes
metadata.combine(_process(metricsJob, workspace, dataId, processes))

pipeline = apPipe.ApPipeTask(workspace.workButler,
os.path.join(workspace.outputRepo, 'association.db'))

metadata.combine(_process(metricsJob, pipeline, workspace, dataId, processes))
log.info('Single-frame processing complete')

metadata.combine(_difference(metricsJob, workspace, dataId, processes))
metadata.combine(_difference(metricsJob, pipeline, workspace, dataId, processes))
log.info('Image differencing complete')
metadata.combine(_associate(metricsJob, workspace, dataId, processes))
metadata.combine(_associate(metricsJob, pipeline, workspace, dataId, processes))
log.info('Source association complete')

_postProcess(workspace)
Expand Down
13 changes: 10 additions & 3 deletions python/lsst/ap/verify/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def __init__(self, location):
os.makedirs(location, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)

self._location = location
# Lazy evaluation to optimize workButler and analysisButler
self._workButler = None
self._analysisButler = None

@property
def dataRepo(self):
Expand Down Expand Up @@ -90,7 +93,9 @@ def workButler(self):
"""A Butler that can produce pipeline inputs and outputs
(`lsst.daf.persistence.Butler`, read-only).
"""
return self._makeButler()
if self._workButler is None:
self._workButler = self._makeButler();
return self._workButler

def _makeButler(self):
"""Create a butler for accessing the entire workspace.
Expand All @@ -106,7 +111,7 @@ def _makeButler(self):
Assumes all `*Repo` properties have been initialized.
"""
# common arguments for butler elements
mapperArgs = {"calibRoot": self.calibRepo}
mapperArgs = {"calibRoot": os.path.abspath(self.calibRepo)}

inputs = [{"root": self.dataRepo, "mapperArgs": mapperArgs}]
outputs = [{"root": self.outputRepo, "mode": "rw", "mapperArgs": mapperArgs}]
Expand All @@ -120,4 +125,6 @@ def _makeButler(self):
def analysisButler(self):
"""A Butler that can read pipeline outputs (`lsst.daf.persistence.Butler`, read-only).
"""
return dafPersist.Butler(inputs={"root": self.outputRepo, "mode": "r"})
if self._analysisButler is None:
self._analysisButler = dafPersist.Butler(inputs={"root": self.outputRepo, "mode": "r"})
return self._analysisButler

0 comments on commit 953f110

Please sign in to comment.