Skip to content

Commit

Permalink
Merge pull request #417 from lsst/tickets/DM-27164
Browse files Browse the repository at this point in the history
DM-27164: Add task to compute and persist VisitSummary tables
  • Loading branch information
erykoff committed Nov 13, 2020
2 parents bdec61e + 264d5b7 commit 22db31c
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 0 deletions.
3 changes: 3 additions & 0 deletions bin.src/consolidateVisitSummary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python
from lsst.pipe.tasks.postprocess import ConsolidateVisitSummaryTask
ConsolidateVisitSummaryTask.parseAndRun()
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
.. lsst-task-topic:: lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask

###########################
ConsolidateVisitSummaryTask
###########################

``ConsolidateVisitSummaryTask`` combines the non-trivial metadata, including the wcs, detector information, psf size and shape, filter, and bounding box corners into one per-visit exposure catalog (dataset `visitSummary`).

``ConsolidateVisitSummaryTask`` is available as a :ref:`command-line task <lsst.pipe.tasks-command-line-tasks>`, :command:`consolidateVisitSummary.py`.

.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummary-summary:

Processing summary
==================

``ConsolidateVisitSummaryTask`` reads in detector-level processed exposure metadata tables (dataset `calexp`) for a given visit, combines these data into an exposure catalog, and writes the result out as a visit-level summary catalog (dataset `visitSummary`).
The metadata from each exposure/detector includes:

- The ``visitInfo``.
- The ``wcs``.
- The ``photoCalib``.
- The ``physical_filter`` and ``band`` (if available).
- The psf size, shape, and effective area at the center of the detector.
- The corners of the bounding box in right ascension/declination.

.. lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-cli:
consolidateVisitSummary.py command-line interface
=================================================

.. code-block:: text
consolidateVisitSummary.py REPOPATH [@file [@file2 ...]] [--output OUTPUTREPO | --rerun RERUN] [--id] [other options]
Key arguments:

.. option:: REPOPATH

The input Butler repository's URI or file path.

Key options:

.. option:: --id

The data IDs to process.

.. seealso::

See :ref:`command-line-task-argument-reference` for details and additional options.

.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-api:

Python API summary
==================

.. lsst-task-api-summary:: lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask

.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-butler:

Butler datasets
===============

When run as the ``consolidateVisitSummary.py`` command-line task, or directly through the `~lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask.runDataRef` method, ``ConsolidateVisitSummaryTask`` obtains datasets from the input Butler data repository and persists outputs to the output Butler data repository.

.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-butler-inputs:

Input datasets
--------------

``calexp``
Per-detector, processed exposures with metadata (wcs, psf, etc.)

.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-butler-outputs:

Output datasets
---------------

``visitSummary``
Per-visit summary catalog of ccd/visit metadata.


.. _lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask-subtasks:

Examples
========

The following command shows an example of how to run the task on an example HSC repository.

.. code-block:: bash
consolidateVisitSummary.py /datasets/hsc/repo --rerun <rerun name> --id visit=30504
.. _lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask-debug:
1 change: 1 addition & 0 deletions pipelines/DRP.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
description: Base Pipeline that describes the Data Release Production process.
inherits:
- location: $PIPE_TASKS_DIR/pipelines/_SingleFrame.yaml
- location: $PIPE_TASKS_DIR/pipelines/_ConsolidateVisit.yaml
- location: $PIPE_TASKS_DIR/pipelines/_Coaddition.yaml
- location: $PIPE_TASKS_DIR/pipelines/_Multiband.yaml
- location: $PIPE_TASKS_DIR/pipelines/_Forced.yaml
8 changes: 8 additions & 0 deletions pipelines/_ConsolidateVisit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
description: ConsolidateVisit
tasks:
consolidateVisitSummary: lsst.pipe.tasks.postprocess.ConsolidateVisitSummaryTask
subsets:
consolidateVisit:
subset:
- consolidateVisitSummary
description: A set of tasks to run to consolidate all detectors from a visit
187 changes: 187 additions & 0 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import functools
import pandas as pd
import numpy as np
from collections import defaultdict

import lsst.geom
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
from lsst.pipe.base import connectionTypes
import lsst.afw.table as afwTable
from lsst.meas.base import SingleFrameMeasurementTask
from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
Expand Down Expand Up @@ -789,6 +791,191 @@ def _makeArgumentParser(cls):
return parser


class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit",),
defaultTemplates={}):
calexp = connectionTypes.Input(
doc="Processed exposures used for metadata",
name="calexp",
storageClass="ExposureF",
dimensions=("instrument", "visit", "detector"),
deferLoad=True,
multiple=True,
)
visitSummary = connectionTypes.Output(
doc="Consolidated visit-level exposure metadata",
name="visitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit"),
)


class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateVisitSummaryConnections):
"""Config for ConsolidateVisitSummaryTask"""
pass


class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
"""Task to consolidate per-detector visit metadata.
This task aggregates the following metadata from all the detectors in a
single visit into an exposure catalog:
- The visitInfo.
- The wcs.
- The photoCalib.
- The physical_filter and band (if available).
- The psf size, shape, and effective area at the center of the detector.
- The corners of the bounding box in right ascension/declination.
Other quantities such as Psf, ApCorrMap, and TransmissionCurve are not
persisted here because of storage concerns, and because of their limited
utility as summary statistics.
Tests for this task are performed in ci_hsc_gen3.
"""
_DefaultName = "consolidateVisitSummary"
ConfigClass = ConsolidateVisitSummaryConfig

@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)

parser.add_id_argument("--id", "calexp",
help="data ID, e.g. --id visit=12345",
ContainerClass=VisitDataIdContainer)
return parser

def writeMetadata(self, dataRef):
"""No metadata to persist, so override to remove metadata persistance.
"""
pass

def writeConfig(self, butler, clobber=False, doBackup=True):
"""No config to persist, so override to remove config persistance.
"""
pass

def runDataRef(self, dataRefList):
visit = dataRefList[0].dataId['visit']

self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)" %
(len(dataRefList), visit))

expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=False)

dataRefList[0].put(expCatalog, 'visitSummary', visit=visit)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
dataRefs = butlerQC.get(inputRefs.calexp)
visit = dataRefs[0].dataId.byName()['visit']

self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)" %
(len(dataRefs), visit))

expCatalog = self._combineExposureMetadata(visit, dataRefs)

butlerQC.put(expCatalog, outputRefs.visitSummary)

def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
"""Make a combined exposure catalog from a list of dataRefs.
Parameters
----------
visit : `int`
Visit identification number
dataRefs : `list`
List of calexp dataRefs in visit. May be list of
`lsst.daf.persistence.ButlerDataRef` (Gen2) or
`lsst.daf.butler.DeferredDatasetHandle` (Gen3).
isGen3 : `bool`, optional
Specifies if this is a Gen3 list of datarefs.
Returns
-------
visitSummary : `lsst.afw.table.ExposureCatalog`
Exposure catalog with per-detector summary information.
"""
schema = afwTable.ExposureTable.makeMinimalSchema()
schema.addField('visit', type='I', doc='Visit number')
schema.addField('detector_id', type='I', doc='Detector number')
schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
schema.addField('band', type='String', size=32, doc='Name of band')
schema.addField('psfSigma', type='F',
doc='PSF model second-moments determinant radius (center of chip) (pixel)')
schema.addField('psfArea', type='F',
doc='PSF model effective area (center of chip) (pixel**2)')
schema.addField('psfIxx', type='F',
doc='PSF model Ixx (center of chip) (pixel**2)')
schema.addField('psfIyy', type='F',
doc='PSF model Iyy (center of chip) (pixel**2)')
schema.addField('psfIxy', type='F',
doc='PSF model Ixy (center of chip) (pixel**2)')
schema.addField('raCorners', type='ArrayD', size=4,
doc='Right Ascension of bounding box corners (degrees)')
schema.addField('decCorners', type='ArrayD', size=4,
doc='Declination of bounding box corners (degrees)')

cat = afwTable.ExposureCatalog(schema)
cat.resize(len(dataRefs))

cat['visit'] = visit

for i, dataRef in enumerate(dataRefs):
if isGen3:
visitInfo = dataRef.get(component='visitInfo')
filter_ = dataRef.get(component='filter')
psf = dataRef.get(component='psf')
wcs = dataRef.get(component='wcs')
photoCalib = dataRef.get(component='photoCalib')
detector = dataRef.get(component='detector')
bbox = dataRef.get(component='bbox')
validPolygon = dataRef.get(component='validPolygon')
else:
# Note that we need to read the calexp because there is
# no magic access to the psf except through the exposure.
gen2_read_bbox = lsst.geom.BoxI(lsst.geom.PointI(0, 0), lsst.geom.PointI(1, 1))
exp = dataRef.get(datasetType='calexp_sub', bbox=gen2_read_bbox)
visitInfo = exp.getInfo().getVisitInfo()
filter_ = exp.getFilter()
psf = exp.getPsf()
wcs = exp.getWcs()
photoCalib = exp.getPhotoCalib()
detector = exp.getDetector()
bbox = dataRef.get(datasetType='calexp_bbox')
validPolygon = exp.getInfo().getValidPolygon()

rec = cat[i]
rec.setBBox(bbox)
rec.setVisitInfo(visitInfo)
rec.setWcs(wcs)
rec.setPhotoCalib(photoCalib)
rec.setDetector(detector)
rec.setValidPolygon(validPolygon)

# TODO: When RFC-730 is implemented we can fill both of these.
rec['physical_filter'] = filter_.getName()
rec['band'] = ''
rec['detector_id'] = detector.getId()
shape = psf.computeShape(bbox.getCenter())
rec['psfSigma'] = shape.getDeterminantRadius()
rec['psfIxx'] = shape.getIxx()
rec['psfIyy'] = shape.getIyy()
rec['psfIxy'] = shape.getIxy()
im = psf.computeKernelImage(bbox.getCenter())
# The calculation of effective psf area is taken from
# meas_base/src/PsfFlux.cc#L112. See
# https://github.com/lsst/meas_base/blob/
# 750bffe6620e565bda731add1509507f5c40c8bb/src/PsfFlux.cc#L112
rec['psfArea'] = np.sum(im.array)/np.sum(im.array**2.)

sph_pts = wcs.pixelToSky(lsst.geom.Box2D(bbox).getCorners())
rec['raCorners'][:] = [sph.getRa().asDegrees() for sph in sph_pts]
rec['decCorners'][:] = [sph.getDec().asDegrees() for sph in sph_pts]

return cat


class VisitDataIdContainer(DataIdContainer):
"""DataIdContainer that groups sensor-level id's by visit
"""
Expand Down

0 comments on commit 22db31c

Please sign in to comment.