diff --git a/pipelines/LATISS/ApPipe.yaml b/pipelines/LATISS/ApPipe.yaml new file mode 100644 index 00000000..d1988828 --- /dev/null +++ b/pipelines/LATISS/ApPipe.yaml @@ -0,0 +1,8 @@ +description: Alert Production pipeline specialized for LATISS + +imports: + - location: $DRP_PIPE_DIR/pipelines/LATISS/DRP.yaml + include: + - isr + - characterizeImage + - calibrate diff --git a/python/activator/activator.py b/python/activator/activator.py index 117f0462..3b2795f8 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -43,6 +43,8 @@ # The short name for the instrument. instrument_name = os.environ["RUBIN_INSTRUMENT"] +# The skymap to use in the central repo +skymap = os.environ["SKYMAP"] # URI to the main repository containing calibs and templates calib_repo = os.environ["CALIB_REPO"] # S3 Endpoint for Buckets; needed for direct Boto access but not Butler @@ -86,6 +88,7 @@ mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), image_bucket, instrument_name, + skymap, local_repos) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 93f6c433..749770e8 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -111,6 +111,8 @@ class MiddlewareInterface: name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam". TODO: this arg can probably be removed and replaced with internal use of the butler. + skymap: `str` + Name of the skymap in the central repo for querying templates. local_storage : `str` An absolute path to a space where this object can create a local Butler repo. The repo is guaranteed to be unique to this object. @@ -120,9 +122,6 @@ class MiddlewareInterface: appropriate for use in the USDF environment; typically only change this when running local tests. """ - _COLLECTION_TEMPLATE = "templates" - """The collection used for templates. - """ _COLLECTION_SKYMAP = "skymaps" """The collection used for skymaps. """ @@ -141,7 +140,7 @@ class MiddlewareInterface: # corresponding to self.camera and self.skymap. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, - local_storage: str, + skymap: str, local_storage: str, prefix: str = "s3://"): # Deployment/version ID -- potentially expensive to generate. self._deployment = self._get_deployment() @@ -170,11 +169,9 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, "camera", instrument=self.instrument.getName(), collections=self.instrument.makeUnboundedCalibrationRunName() ) - # TODO: is central_butler guaranteed to have only one skymap dimension? - skymaps = list(self.central_butler.registry.queryDataIds("skymap")) - assert len(skymaps) == 1, "Ambiguous or missing skymap in central repo." - self.skymap_name = skymaps[0]["skymap"] - self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name) + self.skymap_name = skymap + self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name, + collections=self._COLLECTION_SKYMAP) # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds @@ -371,16 +368,23 @@ def prep_butler(self, visit: Visit) -> None: # TODO: Summit filter names may not match Butler names, especially for composite filters. self._export_skymap_and_templates(export, center, detector, wcs, visit.filters) self._export_calibs(export, visit.detector, visit.filters) - - # CHAINED collections - export.saveCollection(self.instrument.makeRefCatCollectionName()) - export.saveCollection(self._COLLECTION_TEMPLATE) - export.saveCollection(self.instrument.makeUmbrellaCollectionName()) + self._export_collections(export, self.instrument.makeUmbrellaCollectionName()) self.butler.import_(filename=export_file.name, directory=self.central_butler.datastore.root, transfer="copy") + # Temporary workarounds until we have a prompt-processing default top-level collection + # in shared repos and then we can organize collections without worrying DRP use cases. + _prepend_collection(self.butler, + self.instrument.makeUmbrellaCollectionName(), + [self._get_template_collection()]) + + def _get_template_collection(self): + """Get the collection name for templates + """ + return self.instrument.makeCollectionName("templates") + def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. @@ -401,7 +405,8 @@ def _export_refcats(self, export, center, radius): # currently, and we can't queryDatasetTypes in just the refcats # collection, so we have to specify a list here. Replace this # with another solution ASAP. - possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"] + possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", + "atlas_refcat2_20220201"] refcats = set(_filter_datasets( self.central_butler, self.butler, possible_refcats, @@ -453,16 +458,21 @@ def _export_skymap_and_templates(self, export, center, detector, wcs, filter): # TODO: alternately, we need to extract it from the pipeline? (best?) # TODO: alternately, can we just assume that there is exactly # one coadd type in the central butler? - templates = set(_filter_datasets( - self.central_butler, self.butler, - "*Coadd", - collections=self._COLLECTION_TEMPLATE, - instrument=self.instrument.getName(), - skymap=self.skymap_name, - where=template_where, - findFirst=True)) - _log.debug("Found %d new template datasets.", len(templates)) - export.saveDatasets(templates) + try: + templates = set(_filter_datasets( + self.central_butler, self.butler, + "*Coadd", + collections=self._get_template_collection(), + instrument=self.instrument.getName(), + skymap=self.skymap_name, + where=template_where, + findFirst=True)) + except _MissingDatasetError as err: + _log.error(err) + else: + _log.debug("Found %d new template datasets.", len(templates)) + export.saveDatasets(templates) + self._export_collections(export, self._get_template_collection()) def _export_calibs(self, export, detector_id, filter): """Export the calibs for this visit from the central butler. @@ -495,11 +505,24 @@ def _export_calibs(self, export, detector_id, filter): export.saveDatasets( calibs, elements=[]) # elements=[] means do not export dimension records - target_types = {CollectionType.CALIBRATION} - for collection in self.central_butler.registry.queryCollections(..., - collectionTypes=target_types): - export.saveCollection(collection) - export.saveCollection(self.instrument.makeCalibrationCollectionName()) + + def _export_collections(self, export, collection): + """Export the collection and all its children. + + This preserves the collection structure even if some child collections + do not have data. Exporting a collection does not export its datasets. + + Parameters + ---------- + export : `Iterator[RepoExportContext]` + Export context manager. + collection : `str` + The collection to be exported. It is usually a CHAINED collection + and can have many children. + """ + for child in self.central_butler.registry.queryCollections( + collection, flattenChains=True, includeChains=True): + export.saveCollection(child) @staticmethod def _count_by_type(refs): @@ -637,8 +660,12 @@ def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") - pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri) - pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace) + + try: + pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri) + pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace) + except LookupError: + _log.debug("diaPipe is not in this pipeline.") return pipeline def _download(self, remote): diff --git a/tests/data/central_repo/gen3.sqlite3 b/tests/data/central_repo/gen3.sqlite3 index 6b9f3950..a777c03a 100644 Binary files a/tests/data/central_repo/gen3.sqlite3 and b/tests/data/central_repo/gen3.sqlite3 differ diff --git a/tests/data/export.yaml b/tests/data/export.yaml index 5c8f73cf..be4bef09 100644 --- a/tests/data/export.yaml +++ b/tests/data/export.yaml @@ -1,13 +1,15 @@ -# Created from ap_verify_ci_hits2015 on 2022-03-16 using make_preloaded_export.py -# and then modified to add the necessary collection CHAINs. +# Created from ap_verify_ci_hits2015 on 2023-02-23 using make_preloaded_export.py description: Butler Data Repository Export -version: 1.0.1 +version: 1.0.2 +universe_version: 3 +universe_namespace: daf_butler data: - type: dimension element: instrument records: - name: DECam visit_max: 33554432 + visit_system: 0 exposure_max: 33554432 detector_max: 100 class_name: lsst.obs.decam.DarkEnergyCamera @@ -541,24 +543,6 @@ data: host: null timespan_begin: null timespan_end: null -- type: collection - collection_type: CHAINED - name: refcats - children: - - refcats/gen2 -- type: collection - collection_type: CHAINED - name: templates - children: - - templates/deep -- type: collection - collection_type: CHAINED - name: DECam/defaults - children: - - DECam/calib - - refcats - - templates - - skymaps - type: dataset_type name: camera dimensions: diff --git a/tests/data/make_central_repo.sh b/tests/data/make_central_repo.sh index ca5105c3..e85c0717 100644 --- a/tests/data/make_central_repo.sh +++ b/tests/data/make_central_repo.sh @@ -3,12 +3,15 @@ # tests, making all the files be empty and adjusting the sqlite registry to # match. -# This export file was created with this command, and then modified to have the -# CHAINED collections that we expect (path is the repo on JKP's desktop): -# make_preloaded_export.py --src-rep /data/ap_verify_ci_hits2015 +# This export file was created with this command, and then added a comment on the first line: +# make_preloaded_export.py --src-rep $AP_VERIFY_CI_HITS2015_DIR butler create central_repo butler import --export-file export.yaml --transfer copy central_repo/ $AP_VERIFY_CI_HITS2015_DIR/preloaded/ +butler collection-chain central_repo refcats refcats/gen2 +butler collection-chain central_repo DECam/templates templates/deep +butler collection-chain central_repo DECam/defaults DECam/calib,refcats,DECam/templates,skymaps + # Empty out files and make them size 0 in the registry. # We do not empty the camera description in ``DECAM/calib/unbounded` # because we need it to load the camera geometry. @@ -19,4 +22,6 @@ find central_repo/DECam/calib/20150313T000000Z -name "*.fits" -execdir sh -c '> find central_repo/DECam/calib/curated -name "*.fits" -execdir sh -c '> "$1"' -- {} \; # find central_repo/skymaps -name "*.fits" -execdir sh -c '> "$1"' -- {} \; -sqlite3 central_repo/gen3.sqlite3 "update file_datastore_records set file_size=0;" +# The camera and skymap files are not emptied out. The records need to be consistent so that +# the datastore does not complain about size mismatch when reading them. +sqlite3 central_repo/gen3.sqlite3 'update file_datastore_records set file_size=0 where path != "DECam/calib/unbounded/camera/camera_DECam_DECam_calib_unbounded.fits" and path != "skymaps/skyMap/skyMap_deepCoadd_skyMap_skymaps.pickle";' diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index fcfd63ea..4cc58360 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -48,6 +48,8 @@ instname = "DECam" # Full name of the physical filter for the test file. filter = "g DECam SDSS c0001 4720.0 1520.0" +# The skymap name used in the test repo. +skymap_name = "deepCoadd_skyMap" def fake_file_data(filename, dimensions, instrument, visit): @@ -133,7 +135,7 @@ def setUp(self): # Have to preserve the tempdir, so that it doesn't get cleaned up. self.workspace = tempfile.TemporaryDirectory() self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, - self.workspace.name, + skymap_name, self.workspace.name, prefix="file://") # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 @@ -171,7 +173,7 @@ def test_get_butler(self): ]: # TODO: better way to test repo location? self.assertTrue( - butler.getURI("skyMap", skymap="deepCoadd_skyMap", run="foo", predict=True).ospath + butler.getURI("skyMap", skymap=skymap_name, run="foo", predict=True).ospath .startswith(self.central_repo)) self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) self.assertTrue(butler.isWriteable()) @@ -203,7 +205,7 @@ def _check_imports(self, butler, detector, expected_shards): # Check that the right skymap is in the chained output collection. self.assertTrue( butler.datasetExists("skyMap", - skymap="deepCoadd_skyMap", + skymap=skymap_name, collections=self.interface.output_collection) ) @@ -241,7 +243,7 @@ def _check_imports(self, butler, detector, expected_shards): butler.registry.refresh() for patch in (464, 465, 509, 510): butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", - skymap="deepCoadd_skyMap", + skymap=skymap_name, collections=self.interface.output_collection) def test_prep_butler(self): @@ -592,7 +594,7 @@ def setUp(self): self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, - skymap="deepCoadd_skyMap", + skymap=skymap_name, collections=[f"{instname}/defaults"], writeable=True) instrument = "lsst.obs.decam.DarkEnergyCamera" @@ -628,7 +630,8 @@ def setUp(self): self.logger_name = "lsst.activator.middleware_interface" # Populate repository. - self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, workspace.name, + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, + skymap_name, workspace.name, prefix="file://") # TODO: should MiddlewareInterface have a cleanup method? self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo) @@ -675,6 +678,23 @@ def _count_datasets(self, butler, types, collections): def _count_datasets_with_id(self, butler, types, collections, data_id): return len(set(butler.registry.queryDatasets(types, collections=collections, dataId=data_id))) + def test_extra_collection(self): + """Test that extra collections in the chain will not lead to MissingCollectionError + even if they do not carry useful data. + """ + central_butler = Butler(self.central_repo.name, writeable=True) + central_butler.registry.registerCollection("emptyrun", CollectionType.RUN) + _prepend_collection(central_butler, "refcats", ["emptyrun"]) + + self.interface.prep_butler(self.next_visit) + + self.assertEqual( + self._count_datasets(self.interface.butler, "gaia", f"{instname}/defaults"), + 3) + self.assertIn( + "emptyrun", + self.interface.butler.registry.queryCollections("refcats", flattenChains=True)) + def test_export_outputs(self): self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) diff --git a/ups/prompt_prototype.table b/ups/prompt_prototype.table index 96e24dd8..8b8b87e9 100644 --- a/ups/prompt_prototype.table +++ b/ups/prompt_prototype.table @@ -8,6 +8,8 @@ setupRequired(utils) # Used by scripts in bin.src # Used by middleware_interface module due to hard-coded pipeline references. setupRequired(ap_pipe) +# Borrow LATISS configs from drp_pipe +setupRequired(drp_pipe) setupRequired(daf_butler) setupRequired(ctrl_mpexec) setupRequired(geom)