Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-15806: Generalize database handling code #56

Merged
merged 6 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from .dataset import Dataset
from .ingestion import ingestDataset
from .metrics import MetricsParser, checkSquashReady, AutoJob
from .pipeline_driver import ApPipeParser, runApPipe, _getConfig
from .measurements import measureFromButlerRepo, measureFromPpdb
from .pipeline_driver import ApPipeParser, runApPipe
from .measurements import measureFromButlerRepo
from .workspace import Workspace


Expand Down Expand Up @@ -144,10 +144,7 @@ def _measureFinalProperties(metricsJob, workspace, args):
All command-line arguments passed to this program, including those
supported by `lsst.ap.verify.pipeline_driver.ApPipeParser`.
"""
measurements = []
measurements.extend(measureFromButlerRepo(workspace.outputRepo, args.dataId))
# TODO: Add butler storage and retrieval of the Ppdb config. DM-16645
measurements.extend(measureFromPpdb(_getConfig(workspace).ppdb))
measurements = measureFromButlerRepo(workspace.analysisButler, args.dataId)

for measurement in measurements:
metricsJob.measurements.insert(measurement)
Expand Down
23 changes: 11 additions & 12 deletions python/lsst/ap/verify/measurements/compute_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@
defined here, rather than depending on individual measurement functions.
"""

__all__ = ["measureFromButlerRepo",
"measureFromPpdb"]
__all__ = ["measureFromButlerRepo"]

import re

from lsst.ap.pipe import ApPipeTask
from lsst.ap.verify.config import Config
import lsst.daf.persistence as dafPersist
import lsst.dax.ppdb as daxPpdb
from .profiling import measureRuntime
from .association import measureNumberNewDiaObjects, \
measureNumberUnassociatedDiaObjects, \
Expand Down Expand Up @@ -90,13 +87,13 @@ def measureFromMetadata(metadata):
return result


def measureFromButlerRepo(repo, dataId):
def measureFromButlerRepo(butler, dataId):
"""Create measurements from a butler repository.

Parameters
----------
repo : `str`
The output repository location to read from disk.
butler : `lsst.daf.persistence.Butler`
A butler opened to the repository to read.
dataId : `str`
Butler identifier naming the data to be processed (e.g., visit and
ccdnum) formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
Expand All @@ -110,7 +107,6 @@ def measureFromButlerRepo(repo, dataId):

dataIdDict = _convertDataIdString(dataId)

butler = dafPersist.Butler(repo)
measurement = measureNumberSciSources(
butler, dataIdDict, "ip_diffim.numSciSources")
if measurement is not None:
Expand All @@ -121,6 +117,9 @@ def measureFromButlerRepo(repo, dataId):
if measurement is not None:
result.append(measurement)

config = butler.get(ApPipeTask._DefaultName + '_config')
result.extend(measureFromPpdb(config.ppdb))

metadata = butler.get(ApPipeTask._DefaultName + '_metadata', dataId=dataIdDict)
result.extend(measureFromMetadata(metadata))
return result
Expand Down Expand Up @@ -162,15 +161,15 @@ def _convertDataIdString(dataId):
return dataIdDict


def measureFromPpdb(config):
def measureFromPpdb(configurable):
"""Make measurements on a ppdb database containing the results of
source association.

configurable : `lsst.pex.config.Config`
ApVerify configuration with Ppdb configs set.
configurable : `lsst.pex.config.ConfigurableInstance`
A configurable object for a `lsst.dax.ppdb.Ppdb` or similar type.
"""
result = []
ppdb = daxPpdb.Ppdb(config=config)
ppdb = configurable.apply()
measurement = measureTotalUnassociatedDiaObjects(
ppdb, "ap_association.totalUnassociatedDiaObjects")
if measurement is not None:
Expand Down
3 changes: 1 addition & 2 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ def runApPipe(metricsJob, workspace, parsedCmdLine):
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipe')

dataId = _parseDataId(parsedCmdLine.dataId)
# After processes are implemented, remove the flake exception
processes = parsedCmdLine.processes # noqa: F841

# Insert job metadata including dataId
metricsJob.meta.update({'instrument': _extract_instrument_from_butler(workspace.workButler)})
Expand All @@ -149,6 +147,7 @@ def runApPipe(metricsJob, workspace, parsedCmdLine):
try:
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='raw',
dataId=dataId):
pipeline.writeConfig(dataRef.getButler(), clobber=True, doBackup=False)
pipeline.runDataRef(dataRef)
pipeline.writeMetadata(dataRef)
log.info('Pipeline complete')
Expand Down
3 changes: 2 additions & 1 deletion tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def setUp(self):
# Fake Butler to avoid Workspace initialization overhead
butler = self.setUpMockPatch("lsst.daf.persistence.Butler", autospec=True)
butler.getMapperClass.return_value = lsst.obs.test.TestMapper
self.setUpMockPatch("lsst.daf.persistence.searchDataRefs", return_value=[{"visit": 42, "ccd": 0}])
dataRef = self.setUpMockPatch("lsst.daf.persistence.ButlerDataRef", autospec=True)
self.setUpMockPatch("lsst.daf.persistence.searchDataRefs", return_value=[dataRef])

self.job = lsst.verify.Job()
self.workspace = Workspace(self._testDir)
Expand Down