Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26629: update to use calibration collections #54

Merged
merged 3 commits into from
Sep 26, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 25 additions & 10 deletions bin.src/certifyCalibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import argparse
import logging

import astropy.time

import lsst.log
from lsst.log import Log

from lsst.daf.butler import Butler
from lsst.daf.butler import Butler, Timespan

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand All @@ -44,16 +46,23 @@
parser.add_argument("outputCollection", help="Output collection to add to.")
parser.add_argument("datasetTypeName", help="Dataset type to bless.")

parser.add_argument(
"--search-all-inputs",
dest="lastRunOnly",
action="store_false",
default=True,
help=(
"Search all children of the given input collection if it is a "
"CHAINED collection, instead of just the most recent one."
)
)
parser.add_argument("-v", "--verbose", action="store_const", dest="logLevel",
default=Log.INFO, const=Log.DEBUG,
help="Set the log level to DEBUG.")
parser.add_argument("-b", "--beginDate",
help="Start date for using the calibration")
parser.add_argument("-e", "--endDate",
help="End date for using the calibration")
parser.add_argument("-s", "--skipCalibrationLabel", action="store_true",
default=False, dest="skipCL",
help="Do not attempt to register the calibration label.")

args = parser.parse_args()
log = Log.getLogger("lsst.daf.butler")
Expand All @@ -66,12 +75,18 @@

butler = Butler(args.root, run=args.inputCollection)

# I'm not sure this is the best way to convert strings to Time objects, but
# it works fine on the bare dates we seem to be passing in these days, and
# I imagine it should work with more complete time strings as well.
timespan = Timespan(
begin=astropy.time.Time(args.beginDate) if args.beginDate is not None else None,
end=astropy.time.Time(args.endDate) if args.endDate is not None else None,
)

# Do the thing.
certify = CertifyCalibration(butler=butler,
certify = CertifyCalibration(registry=butler.registry,
inputCollection=args.inputCollection,
outputCollection=args.outputCollection)
outputCollection=args.outputCollection,
lastRunOnly=not args.lastRunOnly)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing, but I see it's because of the user-facing option and the CertifyCalibration option having different meanings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were right to be confused; it's wrong, and it's how I broke this script (I added the command-line option after I tested ci_cpp_gen3 with the value defaulted to True in the task). Fixing it now.


certify.findInputs(args.datasetTypeName)
if not args.skipCL:
certify.addCalibrationLabel(beginDate=args.beginDate, endDate=args.endDate)
certify.registerCalibrations(args.datasetTypeName)
certify.run(args.datasetTypeName, timespan)
2 changes: 1 addition & 1 deletion pipelines/cpBias.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tasks:
class: lsst.cp.pipe.cpCombine.CalibCombineTask
config:
connections.inputExps: 'cpBiasProc'
connections.outputData: 'biasProposal'
connections.outputData: 'bias'
calibrationType: 'bias'
exposureScaling: "None"
contracts:
Expand Down
2 changes: 1 addition & 1 deletion pipelines/cpDark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ tasks:
class: lsst.cp.pipe.cpCombine.CalibCombineTask
config:
connections.inputExps: 'cpDarkProc'
connections.outputData: 'darkProposal'
connections.outputData: 'dark'
calibrationType: 'dark'
exposureScaling: "DarkTime"
python: config.mask.append("CR")
Expand Down
2 changes: 1 addition & 1 deletion pipelines/cpFlat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ tasks:
config:
connections.inputExps: 'cpFlatProc'
connections.inputScales: 'cpFlatNormScales'
connections.outputData: 'flatProposal'
connections.outputData: 'flat'
calibrationType: 'flat'
calibrationDimensions: ['physical_filter']
exposureScaling: InputList
Expand Down
2 changes: 1 addition & 1 deletion pipelines/cpFlatSingleChip.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tasks:
class: lsst.cp.pipe.cpCombine.CalibCombineTask
config:
connections.inputExps: 'cpFlatProc'
connections.outputData: 'flatProposal'
connections.outputData: 'flat'
calibrationType: 'flat'
calibrationDimensions: ['physical_filter']
exposureScaling: None
Expand Down
2 changes: 1 addition & 1 deletion pipelines/cpFringe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tasks:
class: lsst.cp.pipe.cpCombine.CalibCombineTask
config:
connections.inputExps: 'cpFringeProc'
connections.outputData: 'fringeProposal'
connections.outputData: 'fringe'
calibrationType: 'fringe'
calibrationDimensions: ['physical_filter']
exposureScaling: "None"
Expand Down
2 changes: 1 addition & 1 deletion pipelines/measureCrosstalk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ tasks:
class: lsst.cp.pipe.measureCrosstalk.CrosstalkSolveTask
config:
connections.inputRatios: 'cpCrosstalkRatio'
connections.outputCrosstalk: 'crosstalkProposal'
connections.outputCrosstalk: 'crosstalk'
74 changes: 74 additions & 0 deletions python/lsst/cp/pipe/_lookupStaticCalibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# This file is part of cp_pipe.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from lsst.obs.base import Instrument

__all__ = ["lookupStaticCalibration"]


def lookupStaticCalibration(datasetType, registry, quantumDataId, collections):
"""A lookup function override for QuantumGraph generation that allows a
PipelineTask to have an input dataset (usually a camera) that is formally a
calibration with a validity range, without having a temporal data ID for
the lookup, by asserting that there is in fact only dataset for all time.

Parameters
----------
datasetType : `lsst.daf.butler.DatasetType`
Dataset type to look up.
registry : `lsst.daf.butler.Registry`
Registry for the data repository being searched.
quantumDataId : `lsst.daf.butler.DataCoordinate`
Data ID for the quantum of the task this dataset will be passed to.
This must include an "instrument" key, and should also include any
keys that are present in ``datasetType.dimensions``. If it has an
``exposure`` or ``visit`` key, that's a sign that this function is
not actually needed, as those come with the temporal information that
would allow a real validity-range lookup.
collections : `lsst.daf.butler.registry.CollectionSearch`
Collections passed by the user when generating a QuantumGraph. Ignored
by this function (see notes below).

Returns
-------
refs : `list` [ `DatasetRef` ]
A zero- or single-element list containing the matching dataset, if one
was found.

Notes
-----
This works by looking in the `~CollectionType.RUN` collection
that `lsst.obs.base.Instrument.writeCuratedCalibrations` (currently!) uses,
instead of the collections passed into it. This may be considered
surprising by users (but will usually go unnoticed because the dataset
returned _is_ actually in those given input colllections, too). It may
stop working entirely once we have data repositories with multiple
calibration collections; a better workaround or a more principled change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies that multiple calibration collections are not allowed? And that writeCuratedCalibrations puts those calibrations into a RUN collection? Does that mean they need to be certified into the calibration collection?
Typo on line 5: "colllections".

Copy link
Member Author

@TallJimbo TallJimbo Sep 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple calibration collections are allowed, but we lack both naming conventions and a good workflow for populating more than one with curated calibrations, and this function essentially relies on those inadequate naming conventions.

More precisely, writeCuratedCalibrations first puts to multiple RUN collections (one for each calibDate + one for all unbounded datasets) and then certifies them all into one CALIBRATION collection. It has a suffix parameter for the collection names, to make it possible to write to a totally distinct suite of collections each time it is invoked, but nothing changes the suffix right now (see the comment from Tim that I pinged you on in obs_base). This workaround will be fine as long writeCuratedCalibrations has been invoked with no suffix argument on the repo, and it will do the right thing as long as this is actually the version of the curated calibrations the user wants - it'll pick up the version created with no suffix regardless.

And, of course, it will only work as long as the camera's validity range actually is unbounded.

to the PipelineTasks that use this function (which are by definition asking
for something ill-defined) will ultimately be needed.
"""
instrument = Instrument.fromName(quantumDataId["instrument"], registry)
unboundedCollection = instrument.makeUnboundedCalibrationRunName()
ref = registry.findDataset(datasetType, dataId=quantumDataId, collections=[unboundedCollection])
if ref is None:
return []
else:
return [ref]
174 changes: 37 additions & 137 deletions python/lsst/cp/pipe/cpCertify.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from astropy.time import Time

import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
from lsst.daf.butler import DatasetType
from lsst.daf.butler import CollectionType


class CertifyCalibration(pipeBase.Task):
Expand All @@ -34,154 +32,56 @@ class CertifyCalibration(pipeBase.Task):

Parameters
----------
butler : `lsst.daf.butler.Butler`
Butler repository to use.
registry : `lsst.daf.butler.Registry`
Registry pointing at the butler repository to operate on.
inputCollection : `str`
Data collection to pull calibrations from.
Data collection to pull calibrations from. Usually an existing
`~CollectionType.RUN` or `~CollectionType.CHAINED` collection, and may
_not_ be a `~CollectionType.CALIBRATION` collection or a nonexistent
collection.
outputCollection : `str`
Data collection to store final calibrations.
Data collection to store final calibrations. If it already exists, it
must be a `~CollectionType.CALIBRATION` collection. If not, a new
`~CollectionType.CALIBRATION` collection with this name will be
registered.
lastRunOnly : `bool`, optional
If `True` (default) and ``inputCollection`` is a
`~CollectionType.CHAINED` collection, only search its first child
collection (which usually corresponds to the last processing run),
instead of all child collections in the chain. This behavior ensures
that datasets in a collection used as input to that processing run
are never included in the certification.
**kwargs :
Additional arguments forwarded to `lsst.pipe.base.Task.__init__`.
"""
_DefaultName = 'CertifyCalibration'
ConfigClass = pexConfig.Config

def __init__(self, *, butler, inputCollection, outputCollection,
**kwargs):
def __init__(self, *, registry, inputCollection, outputCollection, lastRunOnly=True, **kwargs):
super().__init__(**kwargs)
self.butler = butler
self.registry = self.butler.registry
self.registry = registry
if lastRunOnly:
try:
inputCollection, _ = next(iter(self.registry.getCollectionChain(inputCollection)))
except TypeError:
# Not a CHAINED collection; do nothing.
pass
self.inputCollection = inputCollection
self.outputCollection = outputCollection

self.calibrationLabel = None
self.instrument = None

def findInputs(self, datasetTypeName, inputDatasetTypeName=None):
"""Find and prepare inputs for blessing.

Parameters
----------
datasetTypeName : `str`
Dataset that will be blessed.
inputDatasetTypeName : `str`, optional
Dataset name for the input datasets. Default to
datasetTypeName + "Proposal".

Raises
------
RuntimeError
Raised if no input datasets found or if the calibration
label exists and is not empty.
"""
if inputDatasetTypeName is None:
inputDatasetTypeName = datasetTypeName + "Proposal"

self.inputValues = list(self.registry.queryDatasets(inputDatasetTypeName,
collections=[self.inputCollection],
deduplicate=True))
# THIS IS INELEGANT AT BEST => fixed by passing deduplicate=True above.
# self.inputValues = list(filter(lambda vv: self.inputCollection in vv.run, self.inputValues))

if len(self.inputValues) == 0:
raise RuntimeError(f"No inputs found for dataset {inputDatasetTypeName} "
f"in {self.inputCollection}")

# Construct calibration label and choose instrument to use.
self.calibrationLabel = f"{datasetTypeName}/{self.inputCollection}"
self.instrument = self.inputValues[0].dataId['instrument']

# Prepare combination of new data ids and object data:
self.newDataIds = [value.dataId for value in self.inputValues]

self.objects = [self.butler.get(value) for value in self.inputValues]

def registerCalibrations(self, datasetTypeName):
"""Add blessed inputs to the output collection.

Parameters
----------
datasetTypeName : `str`
Dataset type these calibrations will be registered for.
"""
# Find/make the run we will use for the output
self.registry.registerRun(self.outputCollection)
self.butler.run = self.outputCollection
self.butler.collection = None

try:
self.registerDatasetType(datasetTypeName, self.newDataIds[0])
except Exception as e:
print(f"Could not registerDatasetType {datasetTypeName}. Failure {e}?")

with self.butler.transaction():
for newId, data in zip(self.newDataIds, self.objects):
self.butler.put(data, datasetTypeName, dataId=newId,
calibration_label=self.calibrationLabel,
producer=None)

def registerDatasetType(self, datasetTypeName, dataId):
"""Ensure registry can handle this dataset type.
def run(self, datasetTypeName, timespan):
"""Certify all of the datasets of the given type in the input
collection.

Parameters
----------
datasetTypeName : `str`
Name of the dataset that will be registered.
dataId : `lsst.daf.butler.dataId`
Data ID providing the list of dimensions for the new
datasetType.
Name of the dataset type to certify.
timespan : `lsst.daf.butler.Timespan`
Timespan for the validity range.
"""
storageClassMap = {'crosstalk': 'CrosstalkCalib'}
storageClass = storageClassMap.get(datasetTypeName, 'ExposureF')

dimensionArray = set(list(dataId.keys()) + ["calibration_label"])
datasetType = DatasetType(datasetTypeName,
dimensionArray,
storageClass,
universe=self.butler.registry.dimensions)
self.butler.registry.registerDatasetType(datasetType)

def addCalibrationLabel(self, name=None, instrument=None,
beginDate="1970-01-01", endDate="2038-12-31"):

"""Method to allow tasks to add calibration_label for master calibrations.

Parameters
----------
name : `str`, optional
A unique string for the calibration_label key.
instrument : `str`, optional
Instrument this calibration is for.
beginDate : `str`, optional
An ISO 8601 date string for the beginning of the valid date range.
endDate : `str`, optional
An ISO 8601 date string for the end of the valid date range.

Raises
------
RuntimeError :
Raised if the instrument or calibration_label name are not set.
"""
if name is None:
name = self.calibrationLabel
if instrument is None:
instrument = self.instrument
if name is None and instrument is None:
raise RuntimeError("Instrument and calibration_label name not set.")

try:
existingValues = self.registry.queryDataIds(['calibration_label'],
instrument=self.instrument,
calibration_label=name)
existingValues = [a for a in existingValues]
print(f"Found {len(existingValues)} Entries for {self.calibrationLabel}")
except LookupError:
self.butler.registry.insertDimensionData(
"calibration_label",
{
"name": name,
"instrument": instrument,
"datetime_begin": Time(datetime.datetime.fromisoformat(beginDate), scale='utc'),
"datetime_end": Time(datetime.datetime.fromisoformat(endDate), scale='utc'),
}
)
refs = set(self.registry.queryDatasets(datasetTypeName, collections=[self.inputCollection]))
if not refs:
raise RuntimeError(f"No inputs found for dataset {datasetTypeName} in {self.inputCollection}.")
self.registry.registerCollection(self.outputCollection, type=CollectionType.CALIBRATION)
self.registry.certify(self.outputCollection, refs, timespan)