Skip to content

Commit

Permalink
Merge pull request #372 from lsst/tickets/DM-28636
Browse files Browse the repository at this point in the history
DM-28636: various improvements to help with shared repo setup
  • Loading branch information
TallJimbo committed Apr 6, 2021
2 parents 5bec2f1 + 7a54d1e commit 6f8f15d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 26 deletions.
3 changes: 2 additions & 1 deletion python/lsst/obs/base/gen2to3/calibRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import astropy.time
import astropy.units as u

from lsst.daf.butler import DataCoordinate, FileDataset, Timespan
from lsst.daf.butler import CollectionType, DataCoordinate, FileDataset, Timespan
from .repoConverter import RepoConverter
from .repoWalker import RepoWalker

Expand Down Expand Up @@ -291,6 +291,7 @@ def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[Fil
self.task.log.warn(msg)

# Done reading from Gen2, time to certify into Gen3.
self.task.registry.registerCollection(self.collection, type=CollectionType.CALIBRATION)
for timespan, refs in refsByTimespan.items():
self.task.registry.certify(self.collection, refs, timespan)

Expand Down
90 changes: 70 additions & 20 deletions python/lsst/obs/base/gen2to3/convertRepo.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,15 @@ class ConvertRepoConfig(Config):
dtype=str,
default=[]
)
datasetTemplateOverrides = DictField(
"Overrides for Gen2 filename templates, keyed by dataset type. "
"This can be used to support conversions of Gen2 repos whose mapper "
"templates were modified in obs_* packages since the datasets were "
"written.",
keytype=str,
itemtype=str,
default={},
)
ccdKey = Field(
"Key used for the Gen2 equivalent of 'detector' in data IDs.",
dtype=str,
Expand All @@ -361,6 +370,16 @@ class ConvertRepoConfig(Config):
dtype=bool,
default=False,
)
doExpandDataIds = Field(
"If True (default), expand data IDs to include extra metadata before "
"ingesting them. "
"This may be required in order to associate calibration datasets with "
"validity ranges or populate file templates, so setting this to False "
"is considered advanced usage (and it may not always work). When it "
"does, it can provide a considerable speedup.",
dtype=bool,
default=True,
)
doMakeUmbrellaCollection = Field(
"If True (default), define an '<instrument>/defaults' CHAINED "
"collection that includes everything found in the root repo as well "
Expand All @@ -386,6 +405,11 @@ def transfer(self, value):
def setDefaults(self):
self.transfer = None

def validate(self):
super().validate()
if self.relatedOnly and not self.doExpandDataIds():
raise ValueError("relatedOnly requires doExpandDataIds.")


class ConvertRepoTask(Task):
"""A task that converts one or more related Gen2 data repositories to a
Expand All @@ -405,6 +429,12 @@ class ConvertRepoTask(Task):
is converted and ``doWriteCuratedCalibrations`` is `True`.
instrument : `lsst.obs.base.Instrument`
The Gen3 instrument that should be used for this conversion.
dry_run : `bool`, optional
If `True` (`False` is default), make no changes to the Gen3 data
repository while running as many steps as possible. This option is
best used with a read-only ``butler3`` argument to ensure unexpected
edge cases respect this argument (and fail rather than write if they
do not).
**kwargs
Other keyword arguments are forwarded to the `Task` constructor.
Expand All @@ -423,7 +453,8 @@ class ConvertRepoTask(Task):

_DefaultName = "convertRepo"

def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, **kwargs):
def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, dry_run: bool = False,
**kwargs):
config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
super().__init__(config, **kwargs)
# Make self.butler3 one that doesn't have any collections associated
Expand All @@ -448,6 +479,7 @@ def __init__(self, config=None, *, butler3: Butler3, instrument: Instrument, **k
self._usedSkyPix = set()
self.translatorFactory = self.instrument.makeDataIdTranslatorFactory()
self.translatorFactory.log = self.log.getChild("translators")
self.dry_run = dry_run

def _reduce_kwargs(self):
# Add extra parameters to pickle
Expand Down Expand Up @@ -532,7 +564,13 @@ def registerUsedSkyMaps(self, subset: Optional[ConversionSubset]):
"""
for struct in self._configuredSkyMapsBySha1.values():
if struct.used:
struct.instance.register(struct.name, self.butler3)
if not self.dry_run:
try:
# If the skymap isn't registerd, this will raise.
self.butler3.registry.expandDataId(skymap=struct.name)
except LookupError:
self.log.info("Registering skymap %s.", struct.name)
struct.instance.register(struct.name, self.butler3)
if subset is not None and self.config.relatedOnly:
subset.addSkyMap(self.registry, struct.name)

Expand Down Expand Up @@ -601,13 +639,20 @@ def run(self, root: str, *,
pool = Pool(processes)
if calibs is None:
calibs = [CalibRepo(path=None)]
elif calibs and not self.config.doExpandDataIds:
raise ValueError("Cannot convert calib repos with config.doExpandDataIds=False.")
if visits is not None:
subset = ConversionSubset(instrument=self.instrument.getName(), visits=frozenset(visits))
else:
if self.config.relatedOnly:
self.log.warn("config.relatedOnly is True but all visits are being ingested; "
"no filtering will be done.")
subset = None
if (not self.config.doExpandDataIds
and self.butler.datastore.needs_expanded_data_ids(self.config.transfer)):
self.log.warn("config.doExpandDataIds=False but datastore reports that expanded data "
"IDs may be needed.",
self.config.transfer)

# Check that at most one CalibRepo is marked as default, to fail before
# we actually write anything.
Expand Down Expand Up @@ -649,8 +694,12 @@ def run(self, root: str, *,
converters.append(converter)
rerunConverters[spec.runName] = converter

# Walk Gen2 repos to find datasets to convert.
for converter in converters:
converter.prep()

# Register the instrument if we're configured to do so.
if self.config.doRegisterInstrument:
if self.config.doRegisterInstrument and not self.dry_run:
self.instrument.register(self.registry)

# Run raw ingest (does nothing if we weren't configured to convert the
Expand All @@ -661,26 +710,23 @@ def run(self, root: str, *,
# were requested (which may be implicit, by passing calibs=None). Also
# set up a CHAINED collection that points to the default CALIBRATION
# collection if one is needed.
for spec in calibs:
if spec.curated:
self.instrument.writeCuratedCalibrations(self.butler3, labels=spec.labels)
if spec.default and spec.labels:
# This is guaranteed to be True at most once in the loop by
# logic at the top of this method.
defaultCalibName = self.instrument.makeCalibrationCollectionName()
self.butler3.registry.registerCollection(defaultCalibName, CollectionType.CHAINED)
recommendedCalibName = self.instrument.makeCalibrationCollectionName(*spec.labels)
self.butler3.registry.registerCollection(recommendedCalibName, CollectionType.CALIBRATION)
self.butler3.registry.setCollectionChain(defaultCalibName, [recommendedCalibName])
if not self.dry_run:
for spec in calibs:
if spec.curated:
self.instrument.writeCuratedCalibrations(self.butler3, labels=spec.labels)
if spec.default and spec.labels:
# This is guaranteed to be True at most once in the loop by
# logic at the top of this method.
defaultCalibName = self.instrument.makeCalibrationCollectionName()
self.butler3.registry.registerCollection(defaultCalibName, CollectionType.CHAINED)
recommendedCalibName = self.instrument.makeCalibrationCollectionName(*spec.labels)
self.butler3.registry.registerCollection(recommendedCalibName, CollectionType.CALIBRATION)
self.butler3.registry.setCollectionChain(defaultCalibName, [recommendedCalibName])

# Define visits (also does nothing if we weren't configurd to convert
# the 'raw' dataset type).
rootConverter.runDefineVisits(pool=pool)

# Walk Gen2 repos to find datasets convert.
for converter in converters:
converter.prep()

# Insert dimensions that are potentially shared by all Gen2
# repositories (and are hence managed directly by the Task, rather
# than a converter instance).
Expand All @@ -696,8 +742,12 @@ def run(self, root: str, *,
converter.findDatasets()

# Expand data IDs.
for converter in converters:
converter.expandDataIds()
if self.config.doExpandDataIds:
for converter in converters:
converter.expandDataIds()

if self.dry_run:
return

# Actually ingest datasets.
for converter in converters:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/obs/base/gen2to3/repoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class implementation at some point in their own logic.
walkerInputs: List[Union[RepoWalker.Target, RepoWalker.Skip]] = []
for datasetTypeName, mapping in self.iterMappings():
try:
template = mapping.template
template = self.task.config.datasetTemplateOverrides.get(datasetTypeName, mapping.template)
except RuntimeError:
# No template for this dataset in this mapper, so there's no
# way there should be instances of this dataset in this repo.
Expand Down
18 changes: 14 additions & 4 deletions python/lsst/obs/base/gen2to3/rootRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def findMatchingSkyMap(self, datasetTypeName: str) -> Tuple[Optional[BaseSkyMap]

def runRawIngest(self, pool=None):
if self.task.raws is None:
self.task.log.info(f"Skipping raw ingest for {self.root}.")
return
self.task.log.info(f"Finding raws in root {self.root}.")
if self.subset is not None:
Expand All @@ -118,17 +119,26 @@ def runRawIngest(self, pool=None):
else:
dataRefs = self.butler2.subset(self.task.config.rawDatasetType)
dataPaths = getDataPaths(dataRefs)
self.task.log.info("Ingesting raws from root %s into run %s.", self.root, self.task.raws.butler.run)
self._rawRefs.extend(self.task.raws.run(dataPaths, pool=pool))
if not self.task.dry_run:
self.task.log.info("Ingesting raws from root %s into run %s.",
self.root, self.task.raws.butler.run)
self._rawRefs.extend(self.task.raws.run(dataPaths, pool=pool))
else:
self.task.log.info("[dry run] skipping ingesting raws from root %s into run %s.",
self.root, self.task.raws.butler.run)
self._chain = [self.task.raws.butler.run]

def runDefineVisits(self, pool=None):
if self.task.defineVisits is None:
self.task.log.info(f"Skipping visit definition for {self.root}.")
return
dimensions = DimensionGraph(self.task.universe, names=["exposure"])
exposureDataIds = set(ref.dataId.subset(dimensions) for ref in self._rawRefs)
self.task.log.info("Defining visits from exposures.")
self.task.defineVisits.run(exposureDataIds, pool=pool)
if not self.task.dry_run:
self.task.log.info("Defining visits from exposures.")
self.task.defineVisits.run(exposureDataIds, pool=pool)
else:
self.task.log.info("[dry run] Skipping defining visits from exposures.")

def prep(self):
# Docstring inherited from RepoConverter.
Expand Down

0 comments on commit 6f8f15d

Please sign in to comment.