From 021967c0fde22efa252d1b9669e29051e1db0082 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 19 Jul 2022 15:39:02 -0500 Subject: [PATCH 01/18] Fix typo in raw upload script. --- bin/prompt_prototype_upload_raws.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/prompt_prototype_upload_raws.sh b/bin/prompt_prototype_upload_raws.sh index ab8307d3..e821a284 100755 --- a/bin/prompt_prototype_upload_raws.sh +++ b/bin/prompt_prototype_upload_raws.sh @@ -51,6 +51,6 @@ gsutil cp "${RAW_DIR}/HSCA05915116.fits" \ gs://${UPLOAD_BUCKET}/HSC/58/2016030700003/0/0059150/HSC-G/HSC-2016030700003-0-0059150-HSC-G-58.fits gsutil cp "${RAW_DIR}/HSCA05916109.fits" \ - gs://${UPLOAD_BUCKET}/HSC/43/2016030700004/0/0059150/HSC-G/HSC-2016030700004-0-0059160-HSC-G-43.fits + gs://${UPLOAD_BUCKET}/HSC/43/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-43.fits gsutil cp "${RAW_DIR}/HSCA05916113.fits" \ - gs://${UPLOAD_BUCKET}/HSC/51/2016030700004/0/0059150/HSC-G/HSC-2016030700004-0-0059160-HSC-G-51.fits + gs://${UPLOAD_BUCKET}/HSC/51/2016030700004/0/0059160/HSC-G/HSC-2016030700004-0-0059160-HSC-G-51.fits From 4d4802036eaa221059f5e9e694460a28e1063103 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 13 Jul 2022 17:00:48 -0500 Subject: [PATCH 02/18] Refactor WCS calculation from prep_butler. --- python/activator/middleware_interface.py | 34 ++++++++++++++++++------ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 793dd1f6..bd27b382 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -144,26 +144,44 @@ def _init_ingester(self): self.rawIngestTask = lsst.obs.base.RawIngestTask(config=config, butler=self.butler) + def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs: + """Calculate the expected detector WCS for an incoming observation. + + Parameters + ---------- + detector : `lsst.afw.cameraGeom.Detector` + The detector for which to generate a WCS. + visit : `Visit` + Predicted observation metadata for the detector. + + Returns + ------- + wcs : `lsst.afw.geom.SkyWcs` + An approximate WCS for ``visit``. + """ + boresight_center = lsst.geom.SpherePoint(visit.ra, visit.dec, lsst.geom.degrees) + orientation = lsst.geom.Angle(visit.rot, lsst.geom.degrees) + flip_x = True if self.instrument.getName() == "DECam" else False + return lsst.obs.base.createInitialSkyWcsFromBoresight(boresight_center, + orientation, + detector, + flipX=flip_x) + def prep_butler(self, visit: Visit) -> None: """Prepare a temporary butler repo for processing the incoming data. Parameters ---------- - visit : Visit + visit : `Visit` Group of snaps from one detector to prepare the butler for. """ _log.info(f"Preparing Butler for visit '{visit}'") with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: - boresight_center = lsst.geom.SpherePoint(visit.ra, visit.dec, lsst.geom.degrees) - orientation = lsst.geom.Angle(visit.rot, lsst.geom.degrees) detector = self.camera[visit.detector] - flip_x = True if self.instrument.getName() == "DECam" else False - wcs = lsst.obs.base.createInitialSkyWcsFromBoresight(boresight_center, - orientation, - detector, - flipX=flip_x) + wcs = self._predict_wcs(detector, visit) + # Compute the maximum sky circle that contains the detector. radii = [] center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS)) From 54cbce9d80e1bd9b74549f11f2c0a76c1645e842 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 13 Jul 2022 17:34:43 -0500 Subject: [PATCH 03/18] Refactor bounding circle calculation from prep_butler. --- python/activator/middleware_interface.py | 35 +++++++++++++++++++----- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index bd27b382..edf26ff8 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -167,6 +167,33 @@ def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> detector, flipX=flip_x) + def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector, + wcs: lsst.afw.geom.SkyWcs + ) -> (lsst.geom.SpherePoint, lsst.geom.Angle): + # Could return a sphgeom.Circle, but that would require a lot of + # sphgeom->geom conversions downstream. Even their Angles are different! + """Compute a small sky circle that contains the detector. + + Parameters + ---------- + detector : `lsst.afw.cameraGeom.Detector` + The detector for which to compute an on-sky bounding circle. + wcs : `lsst.afw.geom.SkyWcs` + The conversion from detector to sky coordinates. + + Returns + ------- + center : `lsst.geom.SpherePoint` + The center of the bounding circle. + radius : `lsst.geom.Angle` + The opening angle of the bounding circle. + """ + radii = [] + center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS)) + for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS): + radii.append(wcs.pixelToSky(corner).separation(center)) + return center, max(radii) + def prep_butler(self, visit: Visit) -> None: """Prepare a temporary butler repo for processing the incoming data. @@ -181,13 +208,7 @@ def prep_butler(self, visit: Visit) -> None: with self.central_butler.export(filename=export_file.name, format="yaml") as export: detector = self.camera[visit.detector] wcs = self._predict_wcs(detector, visit) - - # Compute the maximum sky circle that contains the detector. - radii = [] - center = wcs.pixelToSky(detector.getCenter(lsst.afw.cameraGeom.PIXELS)) - for corner in detector.getCorners(lsst.afw.cameraGeom.PIXELS): - radii.append(wcs.pixelToSky(corner).separation(center)) - radius = max(radii) + center, radius = self._detector_bounding_circle(detector, wcs) self._export_refcats(export, center, radius) self._export_skymap_and_templates(export, center, detector, wcs) From ad7aa53d7314b207ea71b1861cc3af6d5d599355 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 13 Jul 2022 17:36:48 -0500 Subject: [PATCH 04/18] Refactor calculations out of prep_butler's export/import block. --- python/activator/middleware_interface.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index edf26ff8..16b3d42b 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -204,12 +204,12 @@ def prep_butler(self, visit: Visit) -> None: """ _log.info(f"Preparing Butler for visit '{visit}'") + detector = self.camera[visit.detector] + wcs = self._predict_wcs(detector, visit) + center, radius = self._detector_bounding_circle(detector, wcs) + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: - detector = self.camera[visit.detector] - wcs = self._predict_wcs(detector, visit) - center, radius = self._detector_bounding_circle(detector, wcs) - self._export_refcats(export, center, radius) self._export_skymap_and_templates(export, center, detector, wcs) self._export_calibs(export, visit.detector, visit.filter) From aab8e9364a63765c744ed8bc6a6ca4c85b89c822 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 14:09:02 -0500 Subject: [PATCH 05/18] Actually test for dataset existence when calling datasetExists. --- tests/test_middleware_interface.py | 46 ++++++++++++++++++------------ 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index b556bd8f..24f09238 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -172,10 +172,12 @@ def test_prep_butler(self): collections="refcats")) # Check that the right skymap is in the chained output collection. - self.butler.datasetExists("skyMap", - # TODO: we shouldn't need skymap here? - skymap="deepCoadd_skyMap", - collections=self.interface.output_collection) + self.assertTrue( + self.butler.datasetExists("skyMap", + # TODO: we shouldn't need skymap here? + skymap="deepCoadd_skyMap", + collections=self.interface.output_collection) + ) # These shards were identified by plotting the objects in each shard # on-sky and overplotting the detector corners. @@ -185,20 +187,24 @@ def test_prep_butler(self): self.assertEqual(expected_shards, [x['htm7'] for x in loaded_shards]) # Check that the right calibs are in the chained output collection. try: - self.butler.datasetExists('cpBias', detector=56, instrument='DECam', - collections="DECam/calib/20150218T000000Z") - # TODO: Have to use the exact run collection, because we can't - # query by validity range. - # collections=self.interface.output_collection) + self.assertTrue( + self.butler.datasetExists('cpBias', detector=56, instrument='DECam', + collections="DECam/calib/20150218T000000Z") + # TODO: Have to use the exact run collection, because we can't + # query by validity range. + # collections=self.interface.output_collection) + ) except LookupError: self.fail("Bias file missing from local butler.") try: - self.butler.datasetExists('cpFlat', detector=56, instrument='DECam', - physical_filter=filter, - collections="DECam/calib/20150218T000000Z") - # TODO: Have to use the exact run collection, because we can't - # query by validity range. - # collections=self.interface.output_collection) + self.assertTrue( + self.butler.datasetExists('cpFlat', detector=56, instrument='DECam', + physical_filter=filter, + collections="DECam/calib/20150218T000000Z") + # TODO: Have to use the exact run collection, because we can't + # query by validity range. + # collections=self.interface.output_collection) + ) except LookupError: self.fail("Flat file missing from local butler.") @@ -210,10 +216,12 @@ def test_prep_butler(self): # Need to refresh the butler to get all the dimensions/collections. self.butler.registry.refresh() for patch in (464, 465, 509, 510): - self.butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", - # TODO: we shouldn't need skymap here? - skymap="deepCoadd_skyMap", - collections=self.interface.output_collection) + self.assertTrue( + self.butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", + # TODO: we shouldn't need skymap here? + skymap="deepCoadd_skyMap", + collections=self.interface.output_collection) + ) @unittest.skip("We know this doesn't work (skymaps!), but this is a test we want to have!") def test_prep_butler_twice(self): From ffe6aca24ed8211e109b9aa0ecefbdd948253034 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 13:59:15 -0500 Subject: [PATCH 06/18] Factor out queries from test_prep_butler. --- tests/test_middleware_interface.py | 66 ++++++++++++++++-------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 24f09238..27151ba1 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -158,27 +158,26 @@ def test_init(self): self.assertEqual(self.interface.rawIngestTask.config.failFast, True) self.assertEqual(self.interface.rawIngestTask.config.transfer, "copy") - def test_prep_butler(self): - """Test that the butler has all necessary data for the next visit. + def _check_imports(self, butler): + """Test that the butler has the expected supporting data. """ - self.interface.prep_butler(self.next_visit) - self.assertEqual(self.butler.get('camera', - instrument=instname, - collections=[f"{instname}/calib/unbounded"]).getName(), instname) - - # check that we got appropriate refcat shards - loaded_shards = list(self.butler.registry.queryDataIds("htm7", - datasets="gaia", - collections="refcats")) + self.assertEqual(butler.get('camera', + instrument=instname, + collections=[f"{instname}/calib/unbounded"]).getName(), instname) # Check that the right skymap is in the chained output collection. self.assertTrue( - self.butler.datasetExists("skyMap", - # TODO: we shouldn't need skymap here? - skymap="deepCoadd_skyMap", - collections=self.interface.output_collection) + butler.datasetExists("skyMap", + # TODO: we shouldn't need skymap here? + skymap="deepCoadd_skyMap", + collections=self.interface.output_collection) ) + # check that we got appropriate refcat shards + loaded_shards = list(butler.registry.queryDataIds("htm7", + datasets="gaia", + collections="refcats")) + # These shards were identified by plotting the objects in each shard # on-sky and overplotting the detector corners. # TODO DM-34112: check these shards again with some plots, once I've @@ -188,8 +187,8 @@ def test_prep_butler(self): # Check that the right calibs are in the chained output collection. try: self.assertTrue( - self.butler.datasetExists('cpBias', detector=56, instrument='DECam', - collections="DECam/calib/20150218T000000Z") + butler.datasetExists('cpBias', detector=56, instrument='DECam', + collections="DECam/calib/20150218T000000Z") # TODO: Have to use the exact run collection, because we can't # query by validity range. # collections=self.interface.output_collection) @@ -198,9 +197,9 @@ def test_prep_butler(self): self.fail("Bias file missing from local butler.") try: self.assertTrue( - self.butler.datasetExists('cpFlat', detector=56, instrument='DECam', - physical_filter=filter, - collections="DECam/calib/20150218T000000Z") + butler.datasetExists('cpFlat', detector=56, instrument='DECam', + physical_filter=filter, + collections="DECam/calib/20150218T000000Z") # TODO: Have to use the exact run collection, because we can't # query by validity range. # collections=self.interface.output_collection) @@ -208,20 +207,25 @@ def test_prep_butler(self): except LookupError: self.fail("Flat file missing from local butler.") - # Check that we configured the right pipeline. - self.assertEqual(self.interface.pipeline._pipelineIR.description, - "End to end Alert Production pipeline specialized for HiTS-2015") - # Check that the right templates are in the chained output collection. # Need to refresh the butler to get all the dimensions/collections. - self.butler.registry.refresh() + butler.registry.refresh() for patch in (464, 465, 509, 510): - self.assertTrue( - self.butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", - # TODO: we shouldn't need skymap here? - skymap="deepCoadd_skyMap", - collections=self.interface.output_collection) - ) + butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g", + # TODO: we shouldn't need skymap here? + skymap="deepCoadd_skyMap", + collections=self.interface.output_collection) + + def test_prep_butler(self): + """Test that the butler has all necessary data for the next visit. + """ + self.interface.prep_butler(self.next_visit) + + self._check_imports(self.butler) + + # Check that we configured the right pipeline. + self.assertEqual(self.interface.pipeline._pipelineIR.description, + "End to end Alert Production pipeline specialized for HiTS-2015") @unittest.skip("We know this doesn't work (skymaps!), but this is a test we want to have!") def test_prep_butler_twice(self): From 2613a51023a8f2a326d3134a9c1d56f8a114832b Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 13 Jul 2022 19:39:00 -0500 Subject: [PATCH 07/18] Implement _query_missing_datasets. --- python/activator/middleware_interface.py | 37 +++++++++++++++++++ tests/test_middleware_interface.py | 47 +++++++++++++++++++++++- 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 16b3d42b..cecc2aae 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -21,6 +21,8 @@ __all__ = ["MiddlewareInterface"] +import collections.abc +import itertools import logging import os import os.path @@ -458,3 +460,38 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: # *Diff_diaSrcTable, etc. have not been registered. result = executor.run(register_dataset_types=True) _log.info(f"Pipeline successfully run on {len(result)} quanta.") + + +def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, + *args, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: + """Return datasets that are present in one repository but not another. + + Parameters + ---------- + src_repo : `lsst.daf.butler.Butler` + The repository in which a dataset must be present. + dest_repo : `lsst.daf.butler.Butler` + The repository in which a dataset must not be present. + *args, **kwargs + Parameters for describing the dataset query. They have the same + meanings as the parameters of `lsst.daf.butler.Registry.queryDatasets`. + + Returns + ------- + datasets : iterable [`lsst.daf.butler.DatasetRef`] + The datasets that exist in ``src_repo`` but not ``dest_repo``. + """ + try: + known_datasets = set(dest_repo.registry.queryDatasets(*args, **kwargs)) + except lsst.daf.butler.registry.DataIdValueError as e: + _log.debug("Pre-export query with args '%s, %s' failed with %s", + ", ".join(repr(a) for a in args), + ", ".join(f"{k}={v!r}" for k, v in kwargs.items()), + e) + # If dimensions are invalid, then *any* such datasets are missing. + known_datasets = set() + + # Let exceptions from src_repo query raise: if it fails, that invalidates + # this operation. + return itertools.filterfalse(lambda ref: ref in known_datasets, + src_repo.registry.queryDatasets(*args, **kwargs)) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 27151ba1..f375842f 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import itertools import tempfile import os.path import unittest @@ -34,7 +35,7 @@ import lsst.resources from activator.visit import Visit -from activator.middleware_interface import MiddlewareInterface +from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets # The short name of the instrument used in the test repo. instname = "DECam" @@ -321,3 +322,47 @@ def test_run_pipeline_empty_quantum_graph(self): with self.assertRaisesRegex(RuntimeError, "No data to process"): self.interface.run_pipeline(self.next_visit, {2}) + + def test_query_missing_datasets(self): + """Test that query_missing_datasets provides the correct values. + """ + # Much easier to create DatasetRefs with a real repo. + butler = self.interface.central_butler + dtype = butler.registry.getDatasetType("cpBias") + data1 = lsst.daf.butler.DatasetRef(dtype, {"instrument": "DECam", "detector": 5}) + data2 = lsst.daf.butler.DatasetRef(dtype, {"instrument": "DECam", "detector": 25}) + data3 = lsst.daf.butler.DatasetRef(dtype, {"instrument": "DECam", "detector": 42}) + + for src, existing in itertools.product([set(), {data1, data2}, {data1, data2, data3}], repeat=2): + diff = src - existing + src_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": src}) + existing_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": existing}) + + with self.subTest(src=sorted(ref.dataId["detector"] for ref in src), + existing=sorted(ref.dataId["detector"] for ref in existing)): + result = set(_query_missing_datasets(src_butler, existing_butler, + "cpBias", instrument="DECam")) + src_butler.registry.queryDatasets.assert_called_once_with("cpBias", instrument="DECam") + existing_butler.registry.queryDatasets.assert_called_once_with("cpBias", instrument="DECam") + self.assertEqual(result, diff) + + def test_query_missing_datasets_nodim(self): + """Test that query_missing_datasets provides the correct values when + the destination repository is missing not only datasets, but the + dimensions to define them. + """ + # Much easier to create DatasetRefs with a real repo. + butler = self.interface.central_butler + dtype = butler.registry.getDatasetType("skyMap") + data1 = lsst.daf.butler.DatasetRef(dtype, {"skymap": "mymap"}) + + src_butler = unittest.mock.Mock(**{"registry.queryDatasets.return_value": {data1}}) + existing_butler = unittest.mock.Mock( + **{"registry.queryDatasets.side_effect": + lsst.daf.butler.registry.DataIdValueError( + "Unknown values specified for governor dimension skymap: {'mymap'}") + }) + + result = set(_query_missing_datasets(src_butler, existing_butler, "skyMap", ..., skymap="mymap")) + src_butler.registry.queryDatasets.assert_called_once_with("skyMap", ..., skymap="mymap") + self.assertEqual(result, {data1}) From 8e89eb901eaad86df291c9985a4106c5cc135469 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 13:44:04 -0500 Subject: [PATCH 08/18] Use _query_missing_datasets to skip redundant export/imports. --- python/activator/middleware_interface.py | 20 ++++++++++++-------- tests/test_middleware_interface.py | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index cecc2aae..2f91dac4 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -250,7 +250,8 @@ def _export_refcats(self, export, center, radius): # collection, so we have to specify a list here. Replace this # with another solution ASAP. possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"] - export.saveDatasets(self.central_butler.registry.queryDatasets( + export.saveDatasets(_query_missing_datasets( + self.central_butler, self.butler, possible_refcats, collections=self.instrument.makeRefCatCollectionName(), where=htm_where, @@ -276,9 +277,10 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): # TODO: We only want to import the skymap dimension once in init, # otherwise we get a UNIQUE constraint error when prepping for the # second visit. - export.saveDatasets(self.central_butler.registry.queryDatasets("skyMap", - collections=self._COLLECTION_SKYMAP, - findFirst=True)) + export.saveDatasets(_query_missing_datasets(self.central_butler, self.butler, + "skyMap", + collections=self._COLLECTION_SKYMAP, + findFirst=True)) # Getting only one tract should be safe: we're getting the # tract closest to this detector, so we should be well within # the tract bbox. @@ -294,9 +296,10 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): # TODO: alternately, we need to extract it from the pipeline? (best?) # TODO: alternately, can we just assume that there is exactly # one coadd type in the central butler? - export.saveDatasets(self.central_butler.registry.queryDatasets("*Coadd", - collections=self._COLLECTION_TEMPLATE, - where=template_where)) + export.saveDatasets(_query_missing_datasets(self.central_butler, self.butler, + "*Coadd", + collections=self._COLLECTION_TEMPLATE, + where=template_where)) def _export_calibs(self, export, detector_id, filter): """Export the calibs for this visit from the central butler. @@ -314,7 +317,8 @@ def _export_calibs(self, export, detector_id, filter): # supported in queryDatasets yet. calib_where = f"detector={detector_id} and physical_filter='{filter}'" export.saveDatasets( - self.central_butler.registry.queryDatasets( + _query_missing_datasets( + self.central_butler, self.butler, ..., collections=self.instrument.makeCalibrationCollectionName(), where=calib_where), diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index f375842f..94351f37 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -228,7 +228,6 @@ def test_prep_butler(self): self.assertEqual(self.interface.pipeline._pipelineIR.description, "End to end Alert Production pipeline specialized for HiTS-2015") - @unittest.skip("We know this doesn't work (skymaps!), but this is a test we want to have!") def test_prep_butler_twice(self): """prep_butler should have the correct calibs (and not raise an exception!) on a second run with the same, or a different detector. @@ -239,6 +238,7 @@ def test_prep_butler_twice(self): self.interface.prep_butler(self.next_visit) # TODO: update next_visit with a new group number self.interface.prep_butler(self.next_visit) + self._check_imports(self.butler) def test_ingest_image(self): filename = "fakeRawImage.fits" From 76f00a9fddd22e29eb5f65ff049dae601de4dce0 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 14:29:37 -0500 Subject: [PATCH 09/18] Allow _check_imports to have a variable detector. --- tests/test_middleware_interface.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 94351f37..9628dd09 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -159,7 +159,7 @@ def test_init(self): self.assertEqual(self.interface.rawIngestTask.config.failFast, True) self.assertEqual(self.interface.rawIngestTask.config.transfer, "copy") - def _check_imports(self, butler): + def _check_imports(self, butler, detector): """Test that the butler has the expected supporting data. """ self.assertEqual(butler.get('camera', @@ -188,7 +188,7 @@ def _check_imports(self, butler): # Check that the right calibs are in the chained output collection. try: self.assertTrue( - butler.datasetExists('cpBias', detector=56, instrument='DECam', + butler.datasetExists('cpBias', detector=detector, instrument='DECam', collections="DECam/calib/20150218T000000Z") # TODO: Have to use the exact run collection, because we can't # query by validity range. @@ -198,7 +198,7 @@ def _check_imports(self, butler): self.fail("Bias file missing from local butler.") try: self.assertTrue( - butler.datasetExists('cpFlat', detector=56, instrument='DECam', + butler.datasetExists('cpFlat', detector=detector, instrument='DECam', physical_filter=filter, collections="DECam/calib/20150218T000000Z") # TODO: Have to use the exact run collection, because we can't @@ -222,7 +222,7 @@ def test_prep_butler(self): """ self.interface.prep_butler(self.next_visit) - self._check_imports(self.butler) + self._check_imports(self.butler, detector=56) # Check that we configured the right pipeline. self.assertEqual(self.interface.pipeline._pipelineIR.description, @@ -238,7 +238,7 @@ def test_prep_butler_twice(self): self.interface.prep_butler(self.next_visit) # TODO: update next_visit with a new group number self.interface.prep_butler(self.next_visit) - self._check_imports(self.butler) + self._check_imports(self.butler, detector=56) def test_ingest_image(self): filename = "fakeRawImage.fits" From 6d14661ab4c1a9c9f4dc028f3fa2b22f950354ba Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 14:58:16 -0500 Subject: [PATCH 10/18] Allow _check_imports to have variable shards. --- tests/test_middleware_interface.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 9628dd09..48120810 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -159,7 +159,7 @@ def test_init(self): self.assertEqual(self.interface.rawIngestTask.config.failFast, True) self.assertEqual(self.interface.rawIngestTask.config.transfer, "copy") - def _check_imports(self, butler, detector): + def _check_imports(self, butler, detector, expected_shards): """Test that the butler has the expected supporting data. """ self.assertEqual(butler.get('camera', @@ -175,16 +175,11 @@ def _check_imports(self, butler, detector): ) # check that we got appropriate refcat shards - loaded_shards = list(butler.registry.queryDataIds("htm7", - datasets="gaia", - collections="refcats")) + loaded_shards = butler.registry.queryDataIds("htm7", + datasets="gaia", + collections="refcats") - # These shards were identified by plotting the objects in each shard - # on-sky and overplotting the detector corners. - # TODO DM-34112: check these shards again with some plots, once I've - # determined whether ci_hits2015 actually has enough shards. - expected_shards = [157394, 157401, 157405] - self.assertEqual(expected_shards, [x['htm7'] for x in loaded_shards]) + self.assertEqual(expected_shards, {x['htm7'] for x in loaded_shards}) # Check that the right calibs are in the chained output collection. try: self.assertTrue( @@ -222,7 +217,12 @@ def test_prep_butler(self): """ self.interface.prep_butler(self.next_visit) - self._check_imports(self.butler, detector=56) + # These shards were identified by plotting the objects in each shard + # on-sky and overplotting the detector corners. + # TODO DM-34112: check these shards again with some plots, once I've + # determined whether ci_hits2015 actually has enough shards. + expected_shards = {157394, 157401, 157405} + self._check_imports(self.butler, detector=56, expected_shards=expected_shards) # Check that we configured the right pipeline. self.assertEqual(self.interface.pipeline._pipelineIR.description, @@ -238,7 +238,8 @@ def test_prep_butler_twice(self): self.interface.prep_butler(self.next_visit) # TODO: update next_visit with a new group number self.interface.prep_butler(self.next_visit) - self._check_imports(self.butler, detector=56) + expected_shards = {157394, 157401, 157405} + self._check_imports(self.butler, detector=56, expected_shards=expected_shards) def test_ingest_image(self): filename = "fakeRawImage.fits" From cfc04fc8e584616db6bd185548a06367b591cf72 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 14 Jul 2022 17:12:26 -0500 Subject: [PATCH 11/18] Expand double-registration test. Multiple visits and detectors are more representative of the kinds of double-registrations we'd expect in deployment. --- python/activator/middleware_interface.py | 3 --- tests/test_middleware_interface.py | 28 +++++++++++++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 2f91dac4..e55eaa57 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -274,9 +274,6 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): """ # TODO: This exports the whole skymap, but we want to only export the # subset of the skymap that covers this data. - # TODO: We only want to import the skymap dimension once in init, - # otherwise we get a UNIQUE constraint error when prepping for the - # second visit. export.saveDatasets(_query_missing_datasets(self.central_butler, self.butler, "skyMap", collections=self._COLLECTION_SKYMAP, diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 48120810..ae8eb69c 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -236,11 +236,37 @@ def test_prep_butler_twice(self): the skymap in init" problem. """ self.interface.prep_butler(self.next_visit) - # TODO: update next_visit with a new group number + + # Second visit with everything same except group. + self.next_visit = Visit(instrument=self.next_visit.instrument, + detector=self.next_visit.detector, + group=self.next_visit.group + 1, + snaps=self.next_visit.snaps, + filter=self.next_visit.filter, + ra=self.next_visit.ra, + dec=self.next_visit.dec, + rot=self.next_visit.rot, + kind=self.next_visit.kind) self.interface.prep_butler(self.next_visit) expected_shards = {157394, 157401, 157405} self._check_imports(self.butler, detector=56, expected_shards=expected_shards) + # Third visit with different detector and coordinates. + # Only 5, 10, 56, 60 have valid calibs. + self.next_visit = Visit(instrument=self.next_visit.instrument, + detector=5, + group=self.next_visit.group + 1, + snaps=self.next_visit.snaps, + filter=self.next_visit.filter, + # Offset by a bit over 1 patch. + ra=self.next_visit.ra + 0.4, + dec=self.next_visit.dec - 0.4, + rot=self.next_visit.rot, + kind=self.next_visit.kind) + self.interface.prep_butler(self.next_visit) + expected_shards.update({157218, 157229}) + self._check_imports(self.butler, detector=5, expected_shards=expected_shards) + def test_ingest_image(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) From 4475078ae88683a633bff01f11e4f68aa43b5050 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 3 Aug 2022 16:55:58 -0500 Subject: [PATCH 12/18] Add string representation for Visit. --- python/activator/visit.py | 6 ++++++ tests/test_visit.py | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/python/activator/visit.py b/python/activator/visit.py index b6065fac..1df0dfc8 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -16,3 +16,9 @@ class Visit: dec: float rot: float kind: str + + def __str__(self): + """Return a short string that disambiguates the visit but does not + include "metadata" fields. + """ + return f"(instrument={self.instrument}, group={self.group}, detector={self.detector})" diff --git a/tests/test_visit.py b/tests/test_visit.py index 2daa5688..a896e2c7 100644 --- a/tests/test_visit.py +++ b/tests/test_visit.py @@ -55,3 +55,8 @@ def test_json(self): serialized = json.dumps(self.testbed.__dict__).encode("utf-8") deserialized = Visit(**json.loads(serialized)) self.assertEqual(deserialized, self.testbed) + + def test_str(self): + self.assertNotEqual(str(self.testbed), repr(self.testbed)) + self.assertIn(str(self.testbed.detector), str(self.testbed)) + self.assertIn(str(self.testbed.group), str(self.testbed)) From 9396b46472f7a3eac8f4848fd03779cf5ea8cc80 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 15 Jul 2022 15:30:19 -0500 Subject: [PATCH 13/18] Clarify image-specific log messages. Detector number is useful for tracking parallel processing, but can't be put in the log labels because neither processes nor objects are uniquely associated with a detector. --- python/activator/activator.py | 16 +++++----------- python/activator/middleware_interface.py | 5 +++-- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 5afbf88f..1e2a57f0 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -184,10 +184,7 @@ def next_visit_handler() -> Tuple[str, int]: mwi.ingest_image(oid) expid_set.add(m.group('expid')) - _log.debug( - "Waiting for snaps from group" - f" '{expected_visit.group}' detector {expected_visit.detector}" - ) + _log.debug(f"Waiting for snaps from {expected_visit}.") start = time.time() while len(expid_set) < expected_visit.snaps: response = subscriber.pull( @@ -198,14 +195,11 @@ def next_visit_handler() -> Tuple[str, int]: end = time.time() if len(response.received_messages) == 0: if end - start < timeout: - _log.debug( - f"Empty pull after {end - start}s" - f" for group '{expected_visit.group}'" - ) + _log.debug(f"Empty pull after {end - start}s for {expected_visit}.") continue _log.warning( - "Timed out waiting for image in" - f" group '{expected_visit.group}' after receiving exposures {expid_set}" + f"Timed out waiting for image in {expected_visit} " + f"after receiving exposures {expid_set}" ) break @@ -231,7 +225,7 @@ def next_visit_handler() -> Tuple[str, int]: subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) # Got all the snaps; run the pipeline - _log.info(f"Running pipeline on group: {expected_visit.group} detector: {expected_visit.detector}") + _log.info(f"Running pipeline on {expected_visit}.") mwi.run_pipeline(expected_visit, expid_set) return "Pipeline executed", 200 finally: diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index e55eaa57..977fb040 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -204,7 +204,7 @@ def prep_butler(self, visit: Visit) -> None: visit : `Visit` Group of snaps from one detector to prepare the butler for. """ - _log.info(f"Preparing Butler for visit '{visit}'") + _log.info(f"Preparing Butler for visit {visit!r}") detector = self.camera[visit.detector] wcs = self._predict_wcs(detector, visit) @@ -460,7 +460,8 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. result = executor.run(register_dataset_types=True) - _log.info(f"Pipeline successfully run on {len(result)} quanta.") + _log.info(f"Pipeline successfully run on {len(result)} quanta for " + f"detector {visit.detector} of {exposure_ids}.") def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, From e80460a3cd23f481f9ebf7a85992b14576d60f5d Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 19 Jul 2022 16:29:33 -0500 Subject: [PATCH 14/18] Handle clean failure when all exposures time out. --- python/activator/activator.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 1e2a57f0..60ea2f27 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -224,10 +224,18 @@ def next_visit_handler() -> Tuple[str, int]: _log.error(f"Failed to match object id '{oid}'") subscriber.acknowledge(subscription=subscription.name, ack_ids=ack_list) - # Got all the snaps; run the pipeline - _log.info(f"Running pipeline on {expected_visit}.") - mwi.run_pipeline(expected_visit, expid_set) - return "Pipeline executed", 200 + if expid_set: + # Got at least some snaps; run the pipeline. + # If this is only a partial set, the processed results may still be + # useful for quality purposes. + if len(expid_set) < expected_visit.snaps: + _log.warning(f"Processing {len(expid_set)} snaps, expected {expected_visit.snaps}.") + _log.info(f"Running pipeline on {expected_visit}.") + mwi.run_pipeline(expected_visit, expid_set) + return "Pipeline executed", 200 + else: + _log.fatal(f"Timed out waiting for images for {expected_visit}.") + return "Timed out waiting for images", 500 finally: subscriber.delete_subscription(subscription=subscription.name) From 6694101c5da4900455b4da72b617022cb55af13e Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 19 Jul 2022 17:01:15 -0500 Subject: [PATCH 15/18] Add debugging logs for datasets. --- python/activator/middleware_interface.py | 73 ++++++++++++++++++------ 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 977fb040..03209e04 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -250,12 +250,14 @@ def _export_refcats(self, export, center, radius): # collection, so we have to specify a list here. Replace this # with another solution ASAP. possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"] - export.saveDatasets(_query_missing_datasets( - self.central_butler, self.butler, - possible_refcats, - collections=self.instrument.makeRefCatCollectionName(), - where=htm_where, - findFirst=True)) + refcats = set(_query_missing_datasets( + self.central_butler, self.butler, + possible_refcats, + collections=self.instrument.makeRefCatCollectionName(), + where=htm_where, + findFirst=True)) + _log.debug("Found %d new refcat datasets.", len(refcats)) + export.saveDatasets(refcats) def _export_skymap_and_templates(self, export, center, detector, wcs): """Export the skymap and templates for this visit from the central @@ -274,10 +276,12 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): """ # TODO: This exports the whole skymap, but we want to only export the # subset of the skymap that covers this data. - export.saveDatasets(_query_missing_datasets(self.central_butler, self.butler, - "skyMap", - collections=self._COLLECTION_SKYMAP, - findFirst=True)) + skymaps = set(_query_missing_datasets(self.central_butler, self.butler, + "skyMap", + collections=self._COLLECTION_SKYMAP, + findFirst=True)) + _log.debug("Found %d new skymap datasets.", len(skymaps)) + export.saveDatasets(skymaps) # Getting only one tract should be safe: we're getting the # tract closest to this detector, so we should be well within # the tract bbox. @@ -293,10 +297,12 @@ def _export_skymap_and_templates(self, export, center, detector, wcs): # TODO: alternately, we need to extract it from the pipeline? (best?) # TODO: alternately, can we just assume that there is exactly # one coadd type in the central butler? - export.saveDatasets(_query_missing_datasets(self.central_butler, self.butler, - "*Coadd", - collections=self._COLLECTION_TEMPLATE, - where=template_where)) + templates = set(_query_missing_datasets(self.central_butler, self.butler, + "*Coadd", + collections=self._COLLECTION_TEMPLATE, + where=template_where)) + _log.debug("Found %d new template datasets.", len(templates)) + export.saveDatasets(templates) def _export_calibs(self, export, detector_id, filter): """Export the calibs for this visit from the central butler. @@ -313,18 +319,47 @@ def _export_calibs(self, export, detector_id, filter): # TODO: we can't filter by validity range because it's not # supported in queryDatasets yet. calib_where = f"detector={detector_id} and physical_filter='{filter}'" + calibs = set(_query_missing_datasets( + self.central_butler, self.butler, + ..., + collections=self.instrument.makeCalibrationCollectionName(), + where=calib_where)) + if calibs: + for dataset_type, n_datasets in self._count_by_type(calibs): + _log.debug("Found %d new calib datasets of type '%s'.", n_datasets, dataset_type) + else: + _log.debug("Found 0 new calib datasets.") export.saveDatasets( - _query_missing_datasets( - self.central_butler, self.butler, - ..., - collections=self.instrument.makeCalibrationCollectionName(), - where=calib_where), + calibs, elements=[]) # elements=[] means do not export dimension records target_types = {CollectionType.CALIBRATION} for collection in self.central_butler.registry.queryCollections(..., collectionTypes=target_types): export.saveCollection(collection) + @staticmethod + def _count_by_type(refs): + """Count the number of dataset references of each type. + + Parameters + ---------- + refs : iterable [`lsst.daf.butler.DatasetRef`] + The references to classify. + + Yields + ------ + type : `str` + The name of a dataset type in ``refs``. + count : `int` + The number of elements of type ``type`` in ``refs``. + """ + def get_key(ref): + return ref.datasetType.name + + ordered = sorted(refs, key=get_key) + for k, g in itertools.groupby(ordered, key=get_key): + yield k, len(list(g)) + def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. """ From 9a7416370bf8c5a13c98c16bf06a285b8da7fc44 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 25 Jul 2022 13:22:07 -0500 Subject: [PATCH 16/18] Log local repository creation. --- python/activator/activator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/activator/activator.py b/python/activator/activator.py index 60ea2f27..02da9eea 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -94,6 +94,7 @@ inferDefaults=True) repo = f"/tmp/butler-{os.getpid()}" butler = Butler(Butler.makeRepo(repo), writeable=True) +_log.info("Created local Butler repo at %s.", repo) mwi = MiddlewareInterface(central_butler, image_bucket, config_instrument, butler) From 57ef05938c59af606afa97eeb134c517ca4f79bc Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 27 Jul 2022 13:28:57 -0500 Subject: [PATCH 17/18] Refresh local butler before prepping it. --- python/activator/middleware_interface.py | 3 +++ tests/test_middleware_interface.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 03209e04..908facc9 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -210,6 +210,9 @@ def prep_butler(self, visit: Visit) -> None: wcs = self._predict_wcs(detector, visit) center, radius = self._detector_bounding_circle(detector, wcs) + # Need up-to-date census of what's already present. + self.butler.registry.refresh() + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index ae8eb69c..21d386fd 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -267,6 +267,9 @@ def test_prep_butler_twice(self): expected_shards.update({157218, 157229}) self._check_imports(self.butler, detector=5, expected_shards=expected_shards) + # TODO: regression test for prep_butler having a stale cache for the butler it's updating. + # This may be impossible to unit test, since it seems to depend on Google-side parallelism. + def test_ingest_image(self): filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) From c00d100e941369b17e7ec88c67109d4412d30274 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 27 Jul 2022 15:39:27 -0500 Subject: [PATCH 18/18] Remove TODO that was addressed on Confluence. --- python/activator/activator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 02da9eea..11af70ef 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -84,7 +84,6 @@ # However, we don't want MiddlewareInterface to need to know details like where # the central repo is located, either, so perhaps we need a new module. central_butler = Butler(calib_repo, - # TODO: investigate whether these defaults, esp. skymap, slow down queries instrument=active_instrument.getName(), # NOTE: with inferDefaults=True, it's possible we don't need to hardcode this # value from the real repository.