Skip to content

Commit

Permalink
Merge pull request #303 from lsst/tickets/DM-26629
Browse files Browse the repository at this point in the history
DM- 26629: switch to calibration collections instead of the calibration_label dimension
  • Loading branch information
TallJimbo committed Sep 26, 2020
2 parents 0830830 + aff2274 commit d64a390
Show file tree
Hide file tree
Showing 17 changed files with 599 additions and 409 deletions.
344 changes: 227 additions & 117 deletions python/lsst/obs/base/_instrument.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/lsst/obs/base/cli/butler_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,5 @@ def test_cli(self):
result = runner.invoke(butler.cli, ["write-curated-calibrations",
"here",
"--instrument", self.instrumentName,
"--output-run", "output_run"])
"--collection", "collection"])
self.assertEqual(result.exit_code, 0, f"output: {result.output} exception: {result.exception}")
6 changes: 5 additions & 1 deletion python/lsst/obs/base/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ def register_instrument(*args, **kwargs):
@click.command(short_help="Add an instrument's curated calibrations.")
@repo_argument(required=True)
@instrument_option(required=True)
@run_option(required=False)
@click.option("--collection", required=False,
help="Name of the calibration collection that associates datasets with validity ranges.")
@click.option("--suffix", required=False,
help=("Name suffix to append (with an automatic delimiter) to all RUN collection names "
"as well as the calibration collection name if it is not provided via --collection."))
@options_file_option()
def write_curated_calibrations(*args, **kwargs):
"""Add an instrument's curated calibrations to the data repository.
Expand Down
164 changes: 108 additions & 56 deletions python/lsst/obs/base/gen2to3/calibRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@

__all__ = ["CalibRepoConverter"]

from collections import defaultdict
import os
import sqlite3
from typing import TYPE_CHECKING, Dict, Iterator, List, Mapping, Tuple, Optional

import astropy.time
from typing import TYPE_CHECKING, Dict, Iterator, Tuple, Optional

from lsst.daf.butler import DataCoordinate, FileDataset, Timespan
from .repoConverter import RepoConverter
from .repoWalker import RepoWalker
from .translators import makeCalibrationLabel

if TYPE_CHECKING:
from lsst.daf.butler import StorageClass, FormatterParameter
from lsst.daf.butler import DatasetType, StorageClass, FormatterParameter
from .repoWalker.scanner import PathElementHandler
from ..cameraMapper import CameraMapper
from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping
Expand All @@ -51,9 +53,10 @@ class CalibRepoConverter(RepoConverter):
`RepoConverter`.
"""

def __init__(self, *, mapper: CameraMapper, **kwds):
super().__init__(**kwds)
def __init__(self, *, mapper: CameraMapper, collection: str, **kwds):
super().__init__(run=None, **kwds)
self.mapper = mapper
self.collection = collection
self._datasetTypes = set()

def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
Expand Down Expand Up @@ -83,68 +86,117 @@ def makeRepoWalkerTarget(self, datasetTypeName: str, template: str, keys: Dict[s
self._datasetTypes.add(target.datasetType)
return target

def insertDimensionData(self):
# Docstring inherited from RepoConverter.
# This has only been tested on HSC, and it's not clear how general it
# is. The catch is that it needs to generate calibration_label strings
# consistent with those produced by the Translator system.

def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
) -> Iterator[sqlite3.Row]:
# TODO: docs
fields = ["validStart", "validEnd"]
if "detector" in datasetType.dimensions.names:
fields.append(self.task.config.ccdKey)
else:
fields.append(f"NULL AS {self.task.config.ccdKey}")
if "physical_filter" in datasetType.dimensions.names:
fields.append("filter")
else:
assert "band" not in datasetType.dimensions.names
fields.append("NULL AS filter")
tables = self.mapper.mappings[datasetType.name].tables
if tables is None or len(tables) == 0:
self.task.log.warn("Could not extract calibration ranges for %s in %s; "
"no tables in Gen2 mapper.",
datasetType.name, self.root, tables[0])
return
query = f"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
try:
results = db.execute(query, (calibDate,))
except sqlite3.OperationalError as e:
self.task.log.warn("Could not extract calibration ranges for %s in %s from table %s: %r",
datasetType.name, self.root, tables[0], e)
return
yield from results

def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
# Read Gen2 calibration repository and extract validity ranges for
# all datasetType + calibDate combinations we ingested.
calibFile = os.path.join(self.root, "calibRegistry.sqlite3")

# If the registry file does not exist this indicates a problem.
# We check explicitly because sqlite will try to create the
# missing file if it can.
if not os.path.exists(calibFile):
raise RuntimeError("Attempting to convert calibrations but no registry database"
f" found in {self.root}")
# We will gather results in a dict-of-lists keyed by Timespan, since
# Registry.certify operates on one Timespan and multiple refs at a
# time.
refsByTimespan = defaultdict(list)
db = sqlite3.connect(calibFile)
db.row_factory = sqlite3.Row
records = []
for datasetType in self._datasetTypes:
if "calibration_label" not in datasetType.dimensions:
day = astropy.time.TimeDelta(1, format="jd", scale="tai")
for datasetType, datasetsByCalibDate in datasets.items():
if not datasetType.isCalibration():
continue
fields = ["validStart", "validEnd", "calibDate"]
gen2keys = {}
if "detector" in datasetType.dimensions.names:
fields.append(self.task.config.ccdKey)
else:
fields.append(f"NULL AS {self.task.config.ccdKey}")
if ("physical_filter" in datasetType.dimensions.names
or "band" in datasetType.dimensions.names):
fields.append("filter")
else:
fields.append("NULL AS filter")
query = f"SELECT DISTINCT {', '.join(fields)} FROM {datasetType.name};"
try:
results = db.execute(query)
except sqlite3.OperationalError as e:
if (self.mapper.mappings[datasetType.name].tables is None
or len(self.mapper.mappings[datasetType.name].tables) == 0):
self.task.log.warn("Could not extract calibration ranges for %s in %s: %r",
datasetType.name, self.root, e)
continue
# Try using one of the alternate table names in the mapper (e.g. cpBias->bias for DECam).
name = self.mapper.mappings[datasetType.name].tables[0]
query = f"SELECT DISTINCT {', '.join(fields)} FROM {name};"
try:
results = db.execute(query)
except sqlite3.OperationalError as e:
self.task.log.warn("Could not extract calibration ranges for %s in %s: %r",
datasetType.name, self.root, e)
continue
for row in results:
label = makeCalibrationLabel(datasetType.name, row["calibDate"],
ccd=row[self.task.config.ccdKey], filter=row["filter"])
# For validity times we use TAI as some gen2 repos have validity
# dates very far in the past or future.
day = astropy.time.TimeDelta(1, format="jd", scale="tai")
records.append({
"instrument": self.task.instrument.getName(),
"name": label,
"datetime_begin": astropy.time.Time(row["validStart"], format="iso", scale="tai"),
"datetime_end": astropy.time.Time(row["validEnd"], format="iso", scale="tai") + day
})
if records:
self.task.registry.insertDimensionData("calibration_label", *records)
gen2keys[self.task.config.ccdKey] = int
if "physical_filter" in datasetType.dimensions.names:
gen2keys["filter"] = str
translator = self.instrument.makeDataIdTranslatorFactory().makeMatching(
datasetType.name,
gen2keys,
instrument=self.instrument.getName()
)
for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
assert calibDate is not None, ("datasetType.isCalibration() is set by "
"the presence of calibDate in the Gen2 template")
# Build a mapping that lets us find DatasetRefs by data ID,
# for this DatasetType and calibDate. We know there is only
# one ref for each data ID (given DatasetType and calibDate as
# well).
refsByDataId = {}
for dataset in datasetsForCalibDate:
refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
# Query the Gen2 calibration repo for the validity ranges for
# this DatasetType and calibDate, and look up the appropriate
# refs by data ID.
for row in self._queryGen2CalibRegistry(db, datasetType, calibDate):
# For validity times we use TAI as some gen2 repos have validity
# dates very far in the past or future.
timespan = Timespan(
astropy.time.Time(row["validStart"], format="iso", scale="tai"),
astropy.time.Time(row["validEnd"], format="iso", scale="tai") + day,
)
# Make a Gen2 data ID from query results.
gen2id = {}
if "detector" in datasetType.dimensions.names:
gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey]
if "physical_filter" in datasetType.dimensions.names:
gen2id["filter"] = row["filter"]
# Translate that to Gen3.
gen3id, _ = translator(gen2id)
dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
ref = refsByDataId.get(dataId)
if ref is not None:
refsByTimespan[timespan].append(ref)
else:
# The Gen2 calib registry mentions this dataset, but it
# isn't included in what we've ingested. This might
# sometimes be a problem, but it should usually
# represent someone just trying to convert a subset of
# the Gen2 repo, so I don't think it's appropriate to
# warn or even log at info, since in that case there
# may be a _lot_ of these messages.
self.task.log.debug(
"Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
datasetType.name, calibDate, dataId
)
# Done reading from Gen2, time to certify into Gen3.
for timespan, refs in refsByTimespan.items():
self.task.registry.certify(self.collection, refs, timespan)

def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str:
if calibDate is None:
return super().getRun(datasetTypeName)
else:
return self.instrument.makeCollectionName("calib", "gen2", calibDate)

# Class attributes that will be shadowed by public instance attributes;
# defined here only for documentation purposes.
Expand Down
45 changes: 23 additions & 22 deletions python/lsst/obs/base/gen2to3/convertRepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ def run(self, root: str, *,
and/or reference catalogs.
calibs : `dict`
Dictionary mapping calibration repository path to the
`~lsst.daf.butler.CollectionType.RUN` collection that converted
datasets within it should be inserted into.
`~lsst.daf.butler.CollectionType.CALIBRATION` collection that
converted datasets within it should be certified into.
reruns : `list` of `Rerun`
Specifications for rerun (processing output) collections to
convert.
Expand All @@ -472,20 +472,23 @@ def run(self, root: str, *,
converters = []
rootConverter = RootRepoConverter(task=self, root=root, subset=subset, instrument=self.instrument)
converters.append(rootConverter)
for calibRoot, run in calibs.items():
for calibRoot, collection in calibs.items():
if not os.path.isabs(calibRoot):
calibRoot = os.path.join(rootConverter.root, calibRoot)
converter = CalibRepoConverter(task=self, root=calibRoot, run=run, instrument=self.instrument,
converter = CalibRepoConverter(task=self, root=calibRoot, collection=collection,
instrument=self.instrument,
mapper=rootConverter.mapper,
subset=rootConverter.subset)
converters.append(converter)
rerunConverters = {}
for spec in reruns:
runRoot = spec.path
if not os.path.isabs(runRoot):
runRoot = os.path.join(rootConverter.root, runRoot)
converter = StandardRepoConverter(task=self, root=runRoot, run=spec.runName,
instrument=self.instrument, subset=rootConverter.subset)
converters.append(converter)
rerunConverters[spec.runName] = converter

# Register the instrument if we're configured to do so.
if self.config.doRegisterInstrument:
Expand All @@ -508,10 +511,12 @@ def run(self, root: str, *,
# here.
if self.config.doWriteCuratedCalibrations:
butler3 = Butler3(butler=self.butler3)
# Write curated calibrations to any new calibration collections we
# created by converting a Gen2 calibration repo.
calibCollections = set()
for run in calibs.values():
self.instrument.writeCuratedCalibrations(butler3, run=run)
calibCollections.add(run)
for collection in calibs.values():
self.instrument.writeCuratedCalibrations(butler3, collection=collection)
calibCollections.add(collection)
# Ensure that we have the curated calibrations even if there
# is no calibration conversion. It's possible that the default
# calib collection will have been specified (in fact the
Expand All @@ -521,7 +526,7 @@ def run(self, root: str, *,
# writeCuratedCalibrations default itself
defaultCalibCollection = self.instrument.makeCollectionName("calib")
if defaultCalibCollection not in calibCollections:
self.instrument.writeCuratedCalibrations(butler3, run=defaultCalibCollection)
self.instrument.writeCuratedCalibrations(butler3, collection=defaultCalibCollection)

# Define visits (also does nothing if we weren't configurd to convert
# the 'raw' dataset type).
Expand All @@ -531,19 +536,6 @@ def run(self, root: str, *,
for converter in converters:
converter.prep()

# Insert dimensions needed by any converters. In practice this is just
# calibration_labels right now, because exposures and visits (and
# things related to them) are handled by RawIngestTask and
# DefineVisitsTask earlier and skymaps are handled later.
#
# Note that we do not try to filter dimensions down to just those
# related to the given visits, even if config.relatedOnly is True; we
# need them in the Gen3 repo in order to be able to know which datasets
# to convert, because Gen2 alone doesn't know enough about the
# relationships between data IDs.
for converter in converters:
converter.insertDimensionData()

# Insert dimensions that are potentially shared by all Gen2
# repositories (and are hence managed directly by the Task, rather
# than a converter instance).
Expand All @@ -566,12 +558,21 @@ def run(self, root: str, *,
for converter in converters:
converter.ingest()

# Perform any post-ingest processing.
for converter in converters:
converter.finish()

# Add chained collections for reruns.
for spec in reruns:
if spec.chainName is not None:
self.butler3.registry.registerCollection(spec.chainName, type=CollectionType.CHAINED)
chain = [spec.runName]
chain.extend(spec.parents)
chain.extend(rerunConverters[spec.runName].getCollectionChain())
for parent in spec.parents:
chain.append(spec.parent)
parentConverter = rerunConverters.get(parent)
if parentConverter is not None:
chain.extend(parentConverter.getCollectionChain())
chain.extend(rootConverter.getCollectionChain())
self.log.info("Defining %s from chain %s.", spec.chainName, chain)
self.butler3.registry.setCollectionChain(spec.chainName, chain)

0 comments on commit d64a390

Please sign in to comment.