diff --git a/python/activator/activator.py b/python/activator/activator.py index 11af70ef..94e31b1f 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -84,13 +84,9 @@ # However, we don't want MiddlewareInterface to need to know details like where # the central repo is located, either, so perhaps we need a new module. central_butler = Butler(calib_repo, - instrument=active_instrument.getName(), - # NOTE: with inferDefaults=True, it's possible we don't need to hardcode this - # value from the real repository. - # skymap="hsc_rings_v1", collections=[active_instrument.makeCollectionName("defaults")], writeable=False, - inferDefaults=True) + inferDefaults=False) repo = f"/tmp/butler-{os.getpid()}" butler = Butler(Butler.makeRepo(repo), writeable=True) _log.info("Created local Butler repo at %s.", repo) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 908facc9..6157493e 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -106,7 +106,6 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, define_visits_config = lsst.obs.base.DefineVisitsConfig() self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) - # TODO DM-34098: note that we currently need to supply instrument here. # HACK: explicit collection gets around the fact that we don't have any # timestamp/exposure information in a form we can pass to the Butler. # This code will break once cameras start being versioned. @@ -114,7 +113,11 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, "camera", instrument=self.instrument.getName(), collections=self.instrument.makeUnboundedCalibrationRunName() ) - self.skymap = self.central_butler.get("skyMap") + # TODO: is central_butler guaranteed to have only one skymap dimension? + skymaps = list(self.central_butler.registry.queryDataIds("skymap")) + assert len(skymaps) == 1, "Ambiguous or missing skymap in central repo." + self.skymap_name = skymaps[0]["skymap"] + self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name) # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds @@ -216,7 +219,7 @@ def prep_butler(self, visit: Visit) -> None: with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) - self._export_skymap_and_templates(export, center, detector, wcs) + self._export_skymap_and_templates(export, center, detector, wcs, visit.filter) self._export_calibs(export, visit.detector, visit.filter) # CHAINED collections @@ -244,9 +247,8 @@ def _export_refcats(self, export, center, radius): Radius to search for refcat shards in. """ indexer = HtmIndexer(depth=7) - shards = indexer.getShardIds(center, radius+self.padding) - # getShardIds returns a tuple, the first item is the ids list. - htm_where = f"htm7 in ({','.join(str(x) for x in shards[0])})" + shard_ids, _ = indexer.getShardIds(center, radius+self.padding) + htm_where = f"htm7 in ({','.join(str(x) for x in shard_ids)})" # Get shards from all refcats that overlap this detector. # TODO: `...` doesn't work for this queryDatasets call # currently, and we can't queryDatasetTypes in just the refcats @@ -262,7 +264,7 @@ def _export_refcats(self, export, center, radius): _log.debug("Found %d new refcat datasets.", len(refcats)) export.saveDatasets(refcats) - def _export_skymap_and_templates(self, export, center, detector, wcs): + def _export_skymap_and_templates(self, export, center, detector, wcs, filter): """Export the skymap and templates for this visit from the central butler. @@ -276,11 +278,14 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): Detector we are loading data for. wcs : `lsst.afw.geom.SkyWcs` Rough WCS for the upcoming visit, to help finding patches. + filter : `str` + Physical filter for which to export templates. """ # TODO: This exports the whole skymap, but we want to only export the # subset of the skymap that covers this data. skymaps = set(_query_missing_datasets(self.central_butler, self.butler, "skyMap", + skymap=self.skymap_name, collections=self._COLLECTION_SKYMAP, findFirst=True)) _log.debug("Found %d new skymap datasets.", len(skymaps)) @@ -294,7 +299,7 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): points.append(wcs.pixelToSky(corner)) patches = tract.findPatchList(points) patches_str = ','.join(str(p.sequential_index) for p in patches) - template_where = f"patch in ({patches_str}) and tract={tract.tract_id}" + template_where = f"patch in ({patches_str}) and tract={tract.tract_id} and physical_filter='{filter}'" # TODO: do we need to have the coadd name used in the pipeline # specified as a class kwarg, so that we only load one here? # TODO: alternately, we need to extract it from the pipeline? (best?) @@ -303,6 +308,8 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): templates = set(_query_missing_datasets(self.central_butler, self.butler, "*Coadd", collections=self._COLLECTION_TEMPLATE, + instrument=self.instrument.getName(), + skymap=self.skymap_name, where=template_where)) _log.debug("Found %d new template datasets.", len(templates)) export.saveDatasets(templates) @@ -326,6 +333,7 @@ def _export_calibs(self, export, detector_id, filter): self.central_butler, self.butler, ..., collections=self.instrument.makeCalibrationCollectionName(), + instrument=self.instrument.getName(), where=calib_where)) if calibs: for dataset_type, n_datasets in self._count_by_type(calibs): @@ -485,7 +493,8 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: raise RuntimeError("No data to process.") from e # TODO: can we move this from_pipeline call to prep_butler? - where = f"detector={visit.detector} and exposure in ({','.join(str(x) for x in exposure_ids)})" + where = f"instrument='{visit.instrument}' and detector={visit.detector} " \ + f"and exposure in ({','.join(str(x) for x in exposure_ids)})" executor = SimplePipelineExecutor.from_pipeline(self.pipeline, where=where, butler=self.butler) if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? @@ -515,6 +524,7 @@ def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, *args, **kwargs Parameters for describing the dataset query. They have the same meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`. + The query must be valid for both ``src_repo`` and ``dest_repo``. Returns ------- diff --git a/python/activator/visit.py b/python/activator/visit.py index 1df0dfc8..793b135e 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -6,11 +6,11 @@ @dataclass(frozen=True) class Visit: # elements must be hashable and JSON-persistable; built-in types recommended - instrument: str + instrument: str # short name detector: int - group: str - snaps: int - filter: str + group: str # observatory-specific ID; not the same as visit number + snaps: int # number of snaps expected + filter: str # physical filter # all angles are in degrees ra: float dec: float diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 21d386fd..6eb1a6ff 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -110,11 +110,11 @@ def setUp(self): data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") central_repo = os.path.join(data_dir, "central_repo") central_butler = Butler(central_repo, - instrument=instname, - skymap="deepCoadd_skyMap", collections=[f"{instname}/defaults"], - writeable=False) + writeable=False, + inferDefaults=False) instrument = "lsst.obs.decam.DarkEnergyCamera" + instrument_name = "DECam" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. self.repo = tempfile.TemporaryDirectory() @@ -127,7 +127,7 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument, + self.next_visit = Visit(instrument_name, detector=56, group=1, snaps=1, @@ -169,7 +169,6 @@ def _check_imports(self, butler, detector, expected_shards): # Check that the right skymap is in the chained output collection. self.assertTrue( butler.datasetExists("skyMap", - # TODO: we shouldn't need skymap here? skymap="deepCoadd_skyMap", collections=self.interface.output_collection) ) @@ -208,7 +207,6 @@ def _check_imports(self, butler, detector, expected_shards): butler.registry.refresh() for patch in (464, 465, 509, 510): butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", - # TODO: we shouldn't need skymap here? skymap="deepCoadd_skyMap", collections=self.interface.output_collection)