Skip to content

Commit

Permalink
Remove code for calling individual subtasks.
Browse files Browse the repository at this point in the history
This code was a vestige of when ap_pipe was not itself a Task, and
individual tasks' metadata stopped being used long ago.
  • Loading branch information
kfindeisen committed Oct 17, 2018
1 parent a66fb6a commit 3d3bb33
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 87 deletions.
84 changes: 3 additions & 81 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,76 +104,6 @@ def _updateMetrics(metadata, job):
raise MeasurementStorageError('Task metadata could not be read; possible downstream bug') from e


def _process(pipeline, workspace, dataId, parallelization):
"""Run single-frame processing on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing input and output repositories.
dataId : `dict` from `str` to any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
"""
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='raw', dataId=dataId):
pipeline.runProcessCcd(dataRef)


def _difference(pipeline, workspace, dataId, parallelization):
"""Run image differencing on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing input and output repositories.
dataId : `dict` from `str` to any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
"""
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='calexp', dataId=dataId):
pipeline.runDiffIm(dataRef)


def _associate(pipeline, workspace, dataId, parallelization):
"""Run source association on a dataset.
Parameters
----------
pipeline : `lsst.ap.pipe.ApPipeTask`
An instance of the AP pipeline.
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing output repositories.
dataId : `dict` from `str` to any
Butler identifier naming the data to be processed by the underlying
task(s).
parallelization : `int`
Parallelization level at which to run underlying task(s).
"""
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='calexp', dataId=dataId):
pipeline.runAssociation(dataRef)


def _postProcess(workspace):
"""Run post-processing on a dataset.
This step is called the "afterburner" in some design documents.
Parameters
----------
workspace : `lsst.ap.verify.workspace.Workspace`
The abstract location containing output repositories.
"""
pass


def runApPipe(metricsJob, workspace, parsedCmdLine):
"""Run `ap_pipe` on this object's dataset.
Expand All @@ -196,20 +126,12 @@ def runApPipe(metricsJob, workspace, parsedCmdLine):
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipe')

dataId = _parseDataId(parsedCmdLine.dataId)
processes = parsedCmdLine.processes

pipeline = apPipe.ApPipeTask(workspace.workButler, config=_getConfig(workspace))
try:
_process(pipeline, workspace, dataId, processes)
log.info('Single-frame processing complete')

_difference(pipeline, workspace, dataId, processes)
log.info('Image differencing complete')
_associate(pipeline, workspace, dataId, processes)
log.info('Source association complete')

_postProcess(workspace)
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='calexp', dataId=dataId):
for dataRef in dafPersist.searchDataRefs(workspace.workButler, datasetType='raw',
dataId=dataId):
pipeline.runDataRef(dataRef)
pipeline.writeMetadata(dataRef)
log.info('Pipeline complete')
finally:
Expand Down
8 changes: 2 additions & 6 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,9 @@ def setUpMockPatch(self, target, **kwargs):
def testRunApPipeSteps(self, _mockConfig, mockClass):
"""Test that runApPipe runs the entire pipeline.
"""
# This test case is sensitive to the implementation of pipeline_driver
# Specifically, it needs to know that ApPipeTask.run is not called
pipeline_driver.runApPipe(self.job, self.workspace, self.apPipeArgs)

mockClass.return_value.runProcessCcd.assert_called_once()
mockClass.return_value.runDiffIm.assert_called_once()
mockClass.return_value.runAssociation.assert_called_once()
mockClass.return_value.runDataRef.assert_called_once()

def testUpdateMetricsEmpty(self):
"""Test that _updateMetrics does not add metrics if no job files are provided.
Expand Down Expand Up @@ -167,7 +163,7 @@ def testUpdateMetricsOnError(self, _mockConfig, mockClass):
metadata.add("lsst.ap.pipe.ccdProcessor.verify_json_path", subtaskFile)

mockClass.return_value.getFullMetadata.return_value = metadata
mockClass.return_value.runDiffIm.side_effect = RuntimeError("DECam is weird!")
mockClass.return_value.runDataRef.side_effect = RuntimeError("DECam is weird!")

self.assertNotEqual(self.job.measurements, self.subtaskJob.measurements)

Expand Down

0 comments on commit 3d3bb33

Please sign in to comment.