From cafe9f350f8b2dcdc8ace8f53d39a457c518dfe7 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Aug 2022 10:19:01 -0700 Subject: [PATCH 01/20] Use correct types in Visit objects. `Visit.group` is documented as being `str`, but the test cases make it an `int`. --- python/tester/upload.py | 6 +++--- tests/test_middleware_interface.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 58d8e360..9414372b 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -213,7 +213,7 @@ def make_random_visits(instrument, group): ---------- instrument : `str` The short name of the instrument carrying out the observation. - group : `int` + group : `str` The group number being observed. Returns @@ -344,7 +344,7 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n "unobserved raws are available.") for i, true_group in enumerate(itertools.islice(raw_pool, n_groups)): - group = group_base + i + group = str(group_base + i) _log.debug(f"Processing group {group} from unobserved {true_group}...") # snap_dict maps snap_id to {visit: blob} snap_dict = {} @@ -390,7 +390,7 @@ def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base) The base number from which to offset new group numbers. """ for i in range(n_groups): - group = group_base + i + group = str(group_base + i) visit_infos = make_random_visits(instrument, group) # TODO: may be cleaner to use a functor object than to depend on diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 6eb1a6ff..4ad7e1ec 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -129,7 +129,7 @@ def setUp(self): rot = 90. self.next_visit = Visit(instrument_name, detector=56, - group=1, + group="1", snaps=1, filter=filter, ra=ra, @@ -238,7 +238,7 @@ def test_prep_butler_twice(self): # Second visit with everything same except group. self.next_visit = Visit(instrument=self.next_visit.instrument, detector=self.next_visit.detector, - group=self.next_visit.group + 1, + group=str(int(self.next_visit.group) + 1), snaps=self.next_visit.snaps, filter=self.next_visit.filter, ra=self.next_visit.ra, @@ -253,7 +253,7 @@ def test_prep_butler_twice(self): # Only 5, 10, 56, 60 have valid calibs. self.next_visit = Visit(instrument=self.next_visit.instrument, detector=5, - group=self.next_visit.group + 1, + group=str(int(self.next_visit.group) + 1), snaps=self.next_visit.snaps, filter=self.next_visit.filter, # Offset by a bit over 1 patch. From 63b215bd5085df5e4802e17a004867dd8fe3d71c Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Aug 2022 10:29:59 -0700 Subject: [PATCH 02/20] Use dataclasses.replace to update Visit objects. Using this function instead of homebrew systems makes the code much more concise. --- python/tester/upload.py | 33 +++--------------------------- tests/test_middleware_interface.py | 28 ++++++++----------------- 2 files changed, 12 insertions(+), 49 deletions(-) diff --git a/python/tester/upload.py b/python/tester/upload.py index 9414372b..5c414195 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +import dataclasses from google.cloud import pubsub_v1, storage from google.oauth2 import service_account import itertools @@ -12,7 +12,7 @@ from activator.visit import Visit -@dataclass +@dataclasses.dataclass class Instrument: n_snaps: int n_detectors: int @@ -352,7 +352,7 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n # replacing the (immutable) Visit objects to point to group # instead of true_group. for snap_id, old_visits in raw_pool[true_group].items(): - snap_dict[snap_id] = {splice_group(true_visit, group): blob + snap_dict[snap_id] = {dataclasses.replace(true_visit, group=group): blob for true_visit, blob in old_visits.items()} # Gather all the Visit objects found in snap_dict, merging # duplicates for different snaps of the same detector. @@ -405,32 +405,5 @@ def upload_dummy(visit, snap_id): time.sleep(SLEW_INTERVAL) -def splice_group(visit, group): - """Replace the group ID in a Visit object. - - Parameters - ---------- - visit : `activator.Visit` - The object to update. - group : `str` - The new group ID to use. - - Returns - ------- - new_visit : `activator.Visit` - A visit with group ``group``, but otherwise identical to ``visit``. - """ - return Visit(instrument=visit.instrument, - detector=visit.detector, - group=group, - snaps=visit.snaps, - filter=visit.filter, - ra=visit.ra, - dec=visit.dec, - rot=visit.rot, - kind=visit.kind, - ) - - if __name__ == "__main__": main() diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 4ad7e1ec..de0be936 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import dataclasses import itertools import tempfile import os.path @@ -236,31 +237,20 @@ def test_prep_butler_twice(self): self.interface.prep_butler(self.next_visit) # Second visit with everything same except group. - self.next_visit = Visit(instrument=self.next_visit.instrument, - detector=self.next_visit.detector, - group=str(int(self.next_visit.group) + 1), - snaps=self.next_visit.snaps, - filter=self.next_visit.filter, - ra=self.next_visit.ra, - dec=self.next_visit.dec, - rot=self.next_visit.rot, - kind=self.next_visit.kind) + self.next_visit = dataclasses.replace(self.next_visit, group=str(int(self.next_visit.group) + 1)) self.interface.prep_butler(self.next_visit) expected_shards = {157394, 157401, 157405} self._check_imports(self.butler, detector=56, expected_shards=expected_shards) # Third visit with different detector and coordinates. # Only 5, 10, 56, 60 have valid calibs. - self.next_visit = Visit(instrument=self.next_visit.instrument, - detector=5, - group=str(int(self.next_visit.group) + 1), - snaps=self.next_visit.snaps, - filter=self.next_visit.filter, - # Offset by a bit over 1 patch. - ra=self.next_visit.ra + 0.4, - dec=self.next_visit.dec - 0.4, - rot=self.next_visit.rot, - kind=self.next_visit.kind) + self.next_visit = dataclasses.replace(self.next_visit, + detector=5, + group=str(int(self.next_visit.group) + 1), + # Offset by a bit over 1 patch. + ra=self.next_visit.ra + 0.4, + dec=self.next_visit.dec - 0.4, + ) self.interface.prep_butler(self.next_visit) expected_shards.update({157218, 157229}) self._check_imports(self.butler, detector=5, expected_shards=expected_shards) From a3706e0aab05c69de1ff4945dcb5f305ffae5db6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 15 Aug 2022 11:02:58 -0700 Subject: [PATCH 03/20] Clarify input type of run_pipeline. --- python/activator/middleware_interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 6157493e..73825527 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -472,14 +472,14 @@ def ingest_image(self, oid: str) -> None: assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) - def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: + def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: """Process the received image(s). Parameters ---------- visit : Visit Group of snaps from one detector to be processed. - exposure_ids : `set` + exposure_ids : `set` [`int`] Identifiers of the exposures that were received. """ # TODO: we want to define visits earlier, but we have to ingest a From b625c35920b56a90534cb8b6fa05e42d72bb6d13 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 15 Aug 2022 11:39:59 -0700 Subject: [PATCH 04/20] Remove DM-34202 workaround from MiddlewareInterface. The workaround involved keeping track of run-specific information in MiddlewareInterface state, and hacking around SimplePipelineExecutor's internal Butler management. --- python/activator/middleware_interface.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 73825527..9c0bea9a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -383,23 +383,20 @@ def _prep_collections(self): # 'refcats'], # output=self.output_collection) # The below is taken from SimplePipelineExecutor.prep_butler. - # TODO DM-34202: save run collection in self for now, but we won't need - # it when we no longer need to work around DM-34202. - self.output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" + output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" self.butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), CollectionType.RUN) - self.butler.registry.registerCollection(self.output_run, CollectionType.RUN) + self.butler.registry.registerCollection(output_run, CollectionType.RUN) self.butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) collections = [self.instrument.makeUmbrellaCollectionName(), self.instrument.makeDefaultRawIngestRunName(), - self.output_run] + output_run] self.butler.registry.setCollectionChain(self.output_collection, collections) # Need to create a new butler with all the output collections. self.butler = Butler(butler=self.butler, collections=[self.output_collection], - # TODO DM-34202: hack around a middleware bug. - run=None) + run=output_run) def _prep_pipeline(self, visit: Visit) -> None: """Setup the pipeline to be run, based on the configured instrument and @@ -499,10 +496,6 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") - # TODO DM-34202: hack around a middleware bug. - executor.butler = Butler(butler=self.butler, - collections=[self.output_collection], - run=self.output_run) _log.info(f"Running '{self.pipeline._pipelineIR.description}' on {where}") # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. From cdad18f87e0cee655526ef5f08635467a5ba9c0b Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 17 Aug 2022 10:47:39 -0700 Subject: [PATCH 05/20] Merge internal attributes in MiddlewareInterface. The merging of self.prefix and self.image_bucket into self.image_host cuts down on the number of variables the programmer needs to track, without changing any behavior. --- python/activator/middleware_interface.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 9c0bea9a..adf6fb8b 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -72,7 +72,7 @@ class MiddlewareInterface: Local butler to process data in and hold calibrations, etc.; must be writeable. prefix : `str`, optional - URI identification prefix; prepended to ``image_bucket`` when + URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is appropriate for use in the Google Cloud environment; typically only change this when running local tests. @@ -84,15 +84,18 @@ class MiddlewareInterface: """The collection used for skymaps. """ + # Class invariants: + # self.image_host is a valid URI with non-empty path and no query or fragment. + # self._download_store is None if and only if self.image_host is a local URI. + def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, butler: Butler, prefix: str = "gs://"): self.ip_apdb = os.environ["IP_APDB"] - self.prefix = prefix self.central_butler = central_butler - self.image_bucket = image_bucket + self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later - if not prefix.startswith("file"): + if not self.image_host.startswith("file"): self._download_store = tempfile.TemporaryDirectory(prefix="holding-") else: self._download_store = None @@ -458,7 +461,7 @@ def ingest_image(self, oid: str) -> None: image bucket. """ _log.info(f"Ingesting image id '{oid}'") - file = ResourcePath(f"{self.prefix}{self.image_bucket}/{oid}") + file = ResourcePath(f"{self.image_host}/{oid}") if not file.isLocal: # TODO: RawIngestTask doesn't currently support remote files. file = self._download(file) From f97cb941b9b10cef39288a7d70114a76185c3a0a Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 17 Aug 2022 11:19:40 -0700 Subject: [PATCH 06/20] Standardize subtask setup in MiddlewareInterface. Making _init_ingester and _init_visit_definer parallel methods makes it easier to see the control flow in __init__. The documentation also clarifies what state is assumed by methods called from __init__. --- python/activator/middleware_interface.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index adf6fb8b..981184a1 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -87,6 +87,7 @@ class MiddlewareInterface: # Class invariants: # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. + # self.instrument, self.camera, and self.skymap do not change after __init__. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, butler: Butler, @@ -105,9 +106,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self._init_local_butler(butler) self._init_ingester() - - define_visits_config = lsst.obs.base.DefineVisitsConfig() - self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) + self._init_visit_definer() # HACK: explicit collection gets around the fact that we don't have any # timestamp/exposure information in a form we can pass to the Butler. @@ -128,6 +127,9 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, def _init_local_butler(self, butler: Butler): """Prepare the local butler to ingest into and process from. + ``self.instrument`` must already exist. ``self.butler`` is correctly + initialized after this method returns. + Parameters ---------- butler : `lsst.daf.butler.Butler` @@ -143,6 +145,8 @@ def _init_local_butler(self, butler: Butler): def _init_ingester(self): """Prepare the raw file ingester to receive images into this butler. + + ``self._init_local_butler`` must have already been run. """ config = lsst.obs.base.RawIngestConfig() config.transfer = "copy" # Copy files into the local butler. @@ -152,6 +156,14 @@ def _init_ingester(self): self.rawIngestTask = lsst.obs.base.RawIngestTask(config=config, butler=self.butler) + def _init_visit_definer(self): + """Prepare the visit definer to define visits for this butler. + + ``self._init_local_butler`` must have already been run. + """ + define_visits_config = lsst.obs.base.DefineVisitsConfig() + self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) + def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs: """Calculate the expected detector WCS for an incoming observation. From 6f40a58ee13be6ec4aa33024be111020fe8640f4 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 17 Aug 2022 12:10:59 -0700 Subject: [PATCH 07/20] Move Butler collection setup to MiddlewareInterface.__init__. This change allows `MiddlewareInterface.butler` to have the same default collections for the lifetime of the MWI object, preventing a possible source of error in class code. Unfortunately, it's not possible to have `.butler` be the same *object* for MWI lifetime because of how implicit dimensions are handled. --- python/activator/middleware_interface.py | 25 +++++++++++++++++++----- tests/test_middleware_interface.py | 1 + 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 981184a1..239ea8d8 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -88,6 +88,9 @@ class MiddlewareInterface: # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, and self.skymap do not change after __init__. + # self.butler defaults to a chained collection named + # self.output_collection, which contains zero or more output runs, + # pre-made inputs, and raws, in that order. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, butler: Butler, @@ -138,10 +141,24 @@ def _init_local_butler(self, butler: Butler): """ self.instrument.register(butler.registry) + # Will be populated in prep_butler. + butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(), + CollectionType.CHAINED) + # Will be populated on ingest. + butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), CollectionType.RUN) + # Will be populated on pipeline execution. + butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) + collections = [self.instrument.makeUmbrellaCollectionName(), + self.instrument.makeDefaultRawIngestRunName(), + ] + butler.registry.setCollectionChain(self.output_collection, collections) + # Refresh butler after configuring it, to ensure all required # dimensions and collections are available. butler.registry.refresh() - self.butler = butler + self.butler = Butler(butler=butler, + collections=[self.output_collection], + ) def _init_ingester(self): """Prepare the raw file ingester to receive images into this butler. @@ -399,16 +416,14 @@ def _prep_collections(self): # output=self.output_collection) # The below is taken from SimplePipelineExecutor.prep_butler. output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" - self.butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), - CollectionType.RUN) self.butler.registry.registerCollection(output_run, CollectionType.RUN) - self.butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) collections = [self.instrument.makeUmbrellaCollectionName(), self.instrument.makeDefaultRawIngestRunName(), output_run] self.butler.registry.setCollectionChain(self.output_collection, collections) - # Need to create a new butler with all the output collections. + # TODO: Until DM-35941, need to create a new butler to update governor + # dimensions; refresh isn't enough. self.butler = Butler(butler=self.butler, collections=[self.output_collection], run=output_run) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index de0be936..934eca59 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -155,6 +155,7 @@ def test_init(self): # Check that the butler instance is properly configured. instruments = list(self.butler.registry.queryDimensionRecords("instrument")) self.assertEqual(instname, instruments[0].name) + self.assertEqual(set(self.interface.butler.collections), {self.interface.output_collection}) # Check that the ingester is properly configured. self.assertEqual(self.interface.rawIngestTask.config.failFast, True) From f719dc0df3ce0a0a233a7ccb7a6c24f32c657974 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 1 Sep 2022 13:22:25 -0700 Subject: [PATCH 08/20] Make explicit the pre- and postconditions of the main methods on MiddlewareInterface. This documention is primarily as an aid to future developers, to highlight how dependent MWI is on a linear sequence of events driven by `activator.py`. It may help with debugging future concurrency problems or problems from unexpected sequences of jobs. --- python/activator/middleware_interface.py | 26 +++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 239ea8d8..5aaf1f72 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -54,6 +54,11 @@ class MiddlewareInterface: ingest the data when it is available, and run the difference imaging pipeline, all in that local butler. + Each instance may be used for processing more than one group-detector + combination, designated by the `Visit` parameter to certain methods. There + is no guarantee that a processing run may, or may not, share a group, + detector, or both with a previous run handled by the same object. + Parameters ---------- central_butler : `lsst.daf.butler.Butler` @@ -90,7 +95,9 @@ class MiddlewareInterface: # self.instrument, self.camera, and self.skymap do not change after __init__. # self.butler defaults to a chained collection named # self.output_collection, which contains zero or more output runs, - # pre-made inputs, and raws, in that order. + # pre-made inputs, and raws, in that order. However, self.butler is not + # guaranteed to contain concrete data, or even the dimensions + # corresponding to self.camera and self.skymap. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, butler: Butler, @@ -234,6 +241,12 @@ def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector, def prep_butler(self, visit: Visit) -> None: """Prepare a temporary butler repo for processing the incoming data. + After this method returns, the internal butler is guaranteed to contain + all data and all dimensions needed to run the appropriate pipeline on ``visit``, + except for ``raw`` and the ``exposure`` and ``visit`` dimensions, + respectively. It may contain other data that would not be loaded when + processing ``visit``. + Parameters ---------- visit : `Visit` @@ -481,12 +494,19 @@ def _download(self, remote): def ingest_image(self, oid: str) -> None: """Ingest an image into the temporary butler. + The temporary butler must not already contain a ``raw`` dataset + corresponding to ``oid``. After this method returns, the temporary + butler contains one ``raw`` dataset corresponding to ``oid``, and the + appropriate ``exposure`` dimension. + Parameters ---------- oid : `str` Google storage identifier for incoming image, relative to the image bucket. """ + # TODO: consider allowing pre-existing raws, as may happen when a + # pipeline is rerun (see DM-34141). _log.info(f"Ingesting image id '{oid}'") file = ResourcePath(f"{self.image_host}/{oid}") if not file.isLocal: @@ -502,6 +522,10 @@ def ingest_image(self, oid: str) -> None: def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: """Process the received image(s). + The internal butler must contain all data and all dimensions needed to + run the appropriate pipeline on ``visit`` and ``exposure_ids``, except + for the ``visit`` dimension itself. + Parameters ---------- visit : Visit From 73f9bf844515fafc0f41b6fa3eb473bd7e5aff2a Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 17 Aug 2022 15:02:51 -0700 Subject: [PATCH 09/20] Move run definition to run_pipeline. Keeping the run in run_pipeline instead of object state allows `MiddlewareInterface.butler` to have the same default run for the lifetime of the MWI object, preventing a possible source of error in class code. It also makes `MiddlewareInterface` less sensitive to how it's called from the activator, possibly heading off future concurrency bugs. --- python/activator/middleware_interface.py | 27 ++++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 5aaf1f72..76721a17 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -276,7 +276,11 @@ def prep_butler(self, visit: Visit) -> None: directory=self.central_butler.datastore.root, transfer="copy") - self._prep_collections() + # TODO: Until DM-35941, need to create a new butler to update governor + # dimensions; refresh isn't enough. + self.butler = Butler(butler=self.butler, + collections=[self.output_collection], + ) self._prep_pipeline(visit) def _export_refcats(self, export, center, radius): @@ -418,6 +422,11 @@ def get_key(ref): def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. + + Returns + ------- + run : `str` + The name of a new run collection to use for outputs. """ # NOTE: Because we receive a butler on init, we can't use this # prep_butler() because it takes a repo path. @@ -434,12 +443,7 @@ def _prep_collections(self): self.instrument.makeDefaultRawIngestRunName(), output_run] self.butler.registry.setCollectionChain(self.output_collection, collections) - - # TODO: Until DM-35941, need to create a new butler to update governor - # dimensions; refresh isn't enough. - self.butler = Butler(butler=self.butler, - collections=[self.output_collection], - run=output_run) + return output_run def _prep_pipeline(self, visit: Visit) -> None: """Setup the pipeline to be run, based on the configured instrument and @@ -543,10 +547,15 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") from e - # TODO: can we move this from_pipeline call to prep_butler? + output_run = self._prep_collections() + where = f"instrument='{visit.instrument}' and detector={visit.detector} " \ f"and exposure in ({','.join(str(x) for x in exposure_ids)})" - executor = SimplePipelineExecutor.from_pipeline(self.pipeline, where=where, butler=self.butler) + executor = SimplePipelineExecutor.from_pipeline(self.pipeline, + where=where, + butler=Butler(butler=self.butler, + collections=self.butler.collections, + run=output_run)) if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") From 2590269515d55760fbf793d57063d7d64aebf546 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 17 Aug 2022 12:31:12 -0700 Subject: [PATCH 10/20] Fix overwriting bug in MiddlewareInterface output chain. The previous code would exclude old runs from the output chained collection, causing them to dangle. The chain now behaves the way command-line users of `pipetask run` would expect. --- python/activator/middleware_interface.py | 25 ++++++++++++++++++++---- tests/test_middleware_interface.py | 21 ++++++++++++++++++-- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 76721a17..eef0c8d7 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -439,10 +439,7 @@ def _prep_collections(self): # The below is taken from SimplePipelineExecutor.prep_butler. output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" self.butler.registry.registerCollection(output_run, CollectionType.RUN) - collections = [self.instrument.makeUmbrellaCollectionName(), - self.instrument.makeDefaultRawIngestRunName(), - output_run] - self.butler.registry.setCollectionChain(self.output_collection, collections) + _prepend_collection(self.butler, self.output_collection, [output_run]) return output_run def _prep_pipeline(self, visit: Visit) -> None: @@ -601,3 +598,23 @@ def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, # this operation. return itertools.filterfalse(lambda ref: ref in known_datasets, src_repo.registry.queryDatasets(*args, **kwargs)) + + +def _prepend_collection(butler: Butler, chain: str, new_collections: collections.abc.Iterable[str]) -> None: + """Add a specific collection to the front of an existing chain. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The butler in which the collections exist. + chain : `str` + The chained collection to prepend to. + new_collections : iterable [`str`] + The collections to prepend to ``chain``. + + Notes + ----- + This function is not safe against concurrent modifications to ``chain``. + """ + old_chain = butler.registry.getCollectionChain(chain) # May be empty + butler.registry.setCollectionChain(chain, list(new_collections) + list(old_chain), flatten=False) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 934eca59..1d3ad245 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -30,13 +30,13 @@ import astropy.units as u import astro_metadata_translator -from lsst.daf.butler import Butler, DataCoordinate +from lsst.daf.butler import Butler, CollectionType, DataCoordinate from lsst.obs.base.formatters.fitsExposure import FitsImageFormatter from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData import lsst.resources from activator.visit import Visit -from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets +from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets, _prepend_collection # The short name of the instrument used in the test repo. instname = "DECam" @@ -385,3 +385,20 @@ def test_query_missing_datasets_nodim(self): result = set(_query_missing_datasets(src_butler, existing_butler, "skyMap", ..., skymap="mymap")) src_butler.registry.queryDatasets.assert_called_once_with("skyMap", ..., skymap="mymap") self.assertEqual(result, {data1}) + + def test_prepend_collection(self): + self.butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) + + # Empty chain. + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), []) + _prepend_collection(self.butler, "_prepend_base", ["_prepend1"]) + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) + + # Non-empty chain. + self.butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) + _prepend_collection(self.butler, "_prepend_base", ["_prepend3"]) + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), + ["_prepend3", "_prepend1", "_prepend2"]) From 42fd4caf3928b5d19077aabc528b43ec63d07553 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 18 Aug 2022 12:06:22 -0700 Subject: [PATCH 11/20] Move pipeline definition to run_pipeline. Keeping the pipeline in run_pipeline instead of object state prevents `MiddlewareInterface.pipeline` from being sometimes defined and sometimes not, preventing a possible source of error in class code. It also makes `MiddlewareInterface` less sensitive to how it's called from the activator, possibly heading off future concurrency bugs. --- python/activator/middleware_interface.py | 18 ++++++++++++------ tests/test_middleware_interface.py | 10 +++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index eef0c8d7..791e426b 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -281,7 +281,6 @@ def prep_butler(self, visit: Visit) -> None: self.butler = Butler(butler=self.butler, collections=[self.output_collection], ) - self._prep_pipeline(visit) def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. @@ -451,6 +450,11 @@ def _prep_pipeline(self, visit: Visit) -> None: visit : Visit Group of snaps from one detector to prepare the pipeline for. + Returns + ------- + pipeline : `lsst.pipe.base.Pipeline` + The pipeline to run for ``visit``. + Raises ------ RuntimeError @@ -463,13 +467,14 @@ def _prep_pipeline(self, visit: Visit) -> None: ap_pipeline_file = os.path.join(getPackageDir("prompt_prototype"), "pipelines", self.instrument.getName(), "ApPipe.yaml") try: - self.pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) + pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") # TODO: Can we write to a configurable apdb schema, rather than # "postgres"? - self.pipeline.addConfigOverride("diaPipe", "apdb.db_url", - f"postgresql://postgres@{self.ip_apdb}/postgres") + pipeline.addConfigOverride("diaPipe", "apdb.db_url", + f"postgresql://postgres@{self.ip_apdb}/postgres") + return pipeline def _download(self, remote): """Download an image located on a remote store. @@ -548,7 +553,8 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: where = f"instrument='{visit.instrument}' and detector={visit.detector} " \ f"and exposure in ({','.join(str(x) for x in exposure_ids)})" - executor = SimplePipelineExecutor.from_pipeline(self.pipeline, + pipeline = self._prep_pipeline(visit) + executor = SimplePipelineExecutor.from_pipeline(pipeline, where=where, butler=Butler(butler=self.butler, collections=self.butler.collections, @@ -556,7 +562,7 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") - _log.info(f"Running '{self.pipeline._pipelineIR.description}' on {where}") + _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. result = executor.run(register_dataset_types=True) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 1d3ad245..a8616bc2 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -224,10 +224,6 @@ def test_prep_butler(self): expected_shards = {157394, 157401, 157405} self._check_imports(self.butler, detector=56, expected_shards=expected_shards) - # Check that we configured the right pipeline. - self.assertEqual(self.interface.pipeline._pipelineIR.description, - "End to end Alert Production pipeline specialized for HiTS-2015") - def test_prep_butler_twice(self): """prep_butler should have the correct calibs (and not raise an exception!) on a second run with the same, or a different detector. @@ -320,8 +316,12 @@ def test_run_pipeline(self): self.interface.ingest_image(filename) with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run: - self.interface.run_pipeline(self.next_visit, {1}) + with self.assertLogs(self.logger_name, level="INFO") as logs: + self.interface.run_pipeline(self.next_visit, {1}) mock_run.assert_called_once_with(register_dataset_types=True) + # Check that we configured the right pipeline. + self.assertIn("End to end Alert Production pipeline specialized for HiTS-2015", + "\n".join(logs.output)) def test_run_pipeline_empty_quantum_graph(self): """Test that running a pipeline that results in an empty quantum graph From b5e32f96fb4261841e32b52031a683337771edca Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Aug 2022 14:58:59 -0700 Subject: [PATCH 12/20] Ingest raws into group-specific runs. Putting the raws in uniquely named collections prevents dataset collisions in our test environment (where the same exposure may be ingested as part of different groups) and makes it easier to clean up the central repository if we want to start anew. /raw/all still exists as a chained collection, so any code that assumes its existence should have the expected behavior. --- python/activator/activator.py | 2 +- python/activator/middleware_interface.py | 32 +++++++++++++++++++++--- tests/test_middleware_interface.py | 10 +++++--- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 81528e65..b25e4cf4 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -176,7 +176,7 @@ def next_visit_handler() -> Tuple[str, int]: ) if oid: m = re.match(RAW_REGEXP, oid) - mwi.ingest_image(oid) + mwi.ingest_image(expected_visit, oid) expid_set.add(m.group('expid')) _log.debug(f"Waiting for snaps from {expected_visit}.") diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 791e426b..8c5b5ab3 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -152,7 +152,8 @@ def _init_local_butler(self, butler: Butler): butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(), CollectionType.CHAINED) # Will be populated on ingest. - butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), CollectionType.RUN) + butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), + CollectionType.CHAINED) # Will be populated on pipeline execution. butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) collections = [self.instrument.makeUmbrellaCollectionName(), @@ -276,6 +277,14 @@ def prep_butler(self, visit: Visit) -> None: directory=self.central_butler.datastore.root, transfer="copy") + # Create unique run to store this visit's raws. This makes it easier to + # clean up the central repo later. + # TODO: not needed after DM-36051, or if we find a way to give all test + # raws unique exposure IDs. + input_raws = self._get_raw_run_name(visit) + self.butler.registry.registerCollection(input_raws, CollectionType.RUN) + _prepend_collection(self.butler, self.instrument.makeDefaultRawIngestRunName(), [input_raws]) + # TODO: Until DM-35941, need to create a new butler to update governor # dimensions; refresh isn't enough. self.butler = Butler(butler=self.butler, @@ -419,6 +428,21 @@ def get_key(ref): for k, g in itertools.groupby(ordered, key=get_key): yield k, len(list(g)) + def _get_raw_run_name(self, visit): + """Define a run collection specific to a particular visit. + + Parameters + ---------- + visit : Visit + The visit whose raws will be stored in this run. + + Returns + ------- + run : `str` + The name of a run collection to use for raws. + """ + return self.instrument.makeCollectionName("raw", visit.group) + def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. @@ -497,7 +521,7 @@ def _download(self, remote): local.transfer_from(remote, "copy") return local - def ingest_image(self, oid: str) -> None: + def ingest_image(self, visit: Visit, oid: str) -> None: """Ingest an image into the temporary butler. The temporary butler must not already contain a ``raw`` dataset @@ -507,6 +531,8 @@ def ingest_image(self, oid: str) -> None: Parameters ---------- + visit : Visit + The visit for which the image was taken. oid : `str` Google storage identifier for incoming image, relative to the image bucket. @@ -518,7 +544,7 @@ def ingest_image(self, oid: str) -> None: if not file.isLocal: # TODO: RawIngestTask doesn't currently support remote files. file = self._download(file) - result = self.rawIngestTask.run([file]) + result = self.rawIngestTask.run([file], run=self._get_raw_run_name(visit)) # We only ingest one image at a time. # TODO: replace this assert with a custom exception, once we've decided # how we plan to handle exceptions in this code. diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index a8616bc2..d6882e5b 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -256,6 +256,7 @@ def test_prep_butler_twice(self): # This may be impossible to unit test, since it seems to depend on Google-side parallelism. def test_ingest_image(self): + self.interface.prep_butler(self.next_visit) # Ensure raw collections exist. filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, @@ -264,7 +265,7 @@ def test_ingest_image(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) datasets = list(self.butler.registry.queryDatasets('raw', collections=[f'{instname}/raw/all'])) @@ -283,6 +284,7 @@ def test_ingest_image_fails_missing_file(self): through ingest_image(), we'll want to have a test of "missing file ingestion", and this can serve as a starting point. """ + self.interface.prep_butler(self.next_visit) # Ensure raw collections exist. filename = "nonexistentImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, @@ -292,7 +294,7 @@ def test_ingest_image_fails_missing_file(self): with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock, \ self.assertRaisesRegex(FileNotFoundError, "Resource at .* does not exist"): mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) # There should not be any raw files in the registry. datasets = list(self.butler.registry.queryDatasets('raw', collections=[f'{instname}/raw/all'])) @@ -313,7 +315,7 @@ def test_run_pipeline(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run: with self.assertLogs(self.logger_name, level="INFO") as logs: @@ -337,7 +339,7 @@ def test_run_pipeline_empty_quantum_graph(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) with self.assertRaisesRegex(RuntimeError, "No data to process"): self.interface.run_pipeline(self.next_visit, {2}) From 92ad27335143903c17f64174bd2334339cee4608 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 22 Aug 2022 16:49:35 -0700 Subject: [PATCH 13/20] Add MiddlewareInterface test class for writeable tests. The new class allows for unit tests to safely write to the central repo, without slowing down non-writing tests with unnecessary safeguards. --- tests/test_middleware_interface.py | 93 ++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index d6882e5b..d3a00947 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -404,3 +404,96 @@ def test_prepend_collection(self): _prepend_collection(self.butler, "_prepend_base", ["_prepend3"]) self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), ["_prepend3", "_prepend1", "_prepend2"]) + + +class MiddlewareInterfaceWriteableTest(unittest.TestCase): + """Test the MiddlewareInterface class with faked data. + + This class creates a fresh test repository for writing to. This means test + setup takes longer than for MiddlewareInterfaceTest, so it should be + used sparingly. + """ + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.env_patcher = unittest.mock.patch.dict(os.environ, + {"IP_APDB": "localhost"}) + cls.env_patcher.start() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + + cls.env_patcher.stop() + + def _create_copied_repo(self): + """Create a fresh repository that's a copy of the test data. + + This method sets self.central_repo and arranges cleanup; cleanup would + be awkward if this method returned a Butler instead. + """ + # Copy test data to fresh Butler to allow write tests. + data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + data_repo = os.path.join(data_dir, "central_repo") + data_butler = Butler(data_repo, writeable=False) + self.central_repo = tempfile.TemporaryDirectory() + # TemporaryDirectory warns on leaks + self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.central_repo) + + # Butler.transfer_from can't easily copy collections, so use + # export/import instead. + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as export_file: + with data_butler.export(filename=export_file.name) as export: + export.saveDatasets(data_butler.registry.queryDatasets(..., collections=...)) + for collection in data_butler.registry.queryCollections(): + export.saveCollection(collection) + central_butler = Butler(Butler.makeRepo(self.central_repo.name), writeable=True) + central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") + + def setUp(self): + self._create_copied_repo() + central_butler = Butler(self.central_repo.name, + instrument=instname, + skymap="deepCoadd_skyMap", + collections=[f"{instname}/defaults"], + writeable=False) + instrument = "lsst.obs.decam.DarkEnergyCamera" + data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + self.input_data = os.path.join(data_dir, "input_data") + repo = tempfile.TemporaryDirectory() + # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from + # getting garbage-collected. + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repo) + self.butler = Butler(Butler.makeRepo(repo.name), writeable=True) + + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 + ra = 155.4702849608958 + dec = -4.950050405424033 + # DECam has no rotator; instrument angle is 90 degrees in our system. + rot = 90. + self.next_visit = Visit(instrument, + detector=56, + group="1", + snaps=1, + filter=filter, + ra=ra, + dec=dec, + rot=rot, + kind="SURVEY") + self.logger_name = "lsst.activator.middleware_interface" + + # Populate repository. + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + prefix="file://") + self.interface.prep_butler(self.next_visit) + filename = "fakeRawImage.fits" + filepath = os.path.join(self.input_data, filename) + self.raw_data_id, file_data = fake_file_data(filepath, + self.butler.dimensions, + self.interface.instrument, + self.next_visit) + with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: + mock.return_value = file_data + self.interface.ingest_image(filename) + self.interface.define_visits.run([self.raw_data_id]) From fac49b463a6184c9be23a306c61255520dd94e12 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 23 Aug 2022 14:37:22 -0700 Subject: [PATCH 14/20] Implement MiddlewareInterface.export_outputs. This implementation is fairly conservative, assuming that anything other than a successful transfer (such as being run when there are no outputs) is an error. Butler.transfer_from has serious concurrency problems that will need to be resolved later. Export/import is safer because it lets us avoid performing any potentially-deadlocked queries on governor dimensions. --- python/activator/activator.py | 2 +- python/activator/middleware_interface.py | 117 ++++++++++++++++++++++- tests/test_middleware_interface.py | 54 ++++++++++- 3 files changed, 168 insertions(+), 5 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index b25e4cf4..3f3fbe4c 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -84,7 +84,7 @@ # the central repo is located, either, so perhaps we need a new module. central_butler = Butler(calib_repo, collections=[active_instrument.makeCollectionName("defaults")], - writeable=False, + writeable=True, inferDefaults=False) repo = f"/tmp/butler-{os.getpid()}" butler = Butler(Butler.makeRepo(repo), writeable=True) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 8c5b5ab3..6ecb29b1 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -27,6 +27,7 @@ import os import os.path import tempfile +import typing from lsst.utils import getPackageDir from lsst.resources import ResourcePath @@ -59,6 +60,11 @@ class MiddlewareInterface: is no guarantee that a processing run may, or may not, share a group, detector, or both with a previous run handled by the same object. + ``MiddlewareInterface`` objects are not thread- or process-safe, and must + not share any state with other instances (see ``butler`` in the parameter + list). They may, however, share a ``central_butler``, and concurrent + operations on this butler are guaranteed to be appropriately synchronized. + Parameters ---------- central_butler : `lsst.daf.butler.Butler` @@ -75,7 +81,8 @@ class MiddlewareInterface: use of the butler. butler : `lsst.daf.butler.Butler` Local butler to process data in and hold calibrations, etc.; must be - writeable. + writeable. Must not be shared with any other ``MiddlewareInterface`` + object. prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is @@ -449,7 +456,8 @@ def _prep_collections(self): Returns ------- run : `str` - The name of a new run collection to use for outputs. + The name of a new run collection to use for outputs. The run name + is prefixed by ``self.output_collection``. """ # NOTE: Because we receive a butler on init, we can't use this # prep_butler() because it takes a repo path. @@ -595,6 +603,111 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: _log.info(f"Pipeline successfully run on {len(result)} quanta for " f"detector {visit.detector} of {exposure_ids}.") + def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: + """Copy raws and pipeline outputs from processing a set of images back + to the central Butler. + + The copied raws can be found in the collection ``/raw/all``, + while the outputs can be found in ``"/prompt-results"``. + + Parameters + ---------- + visit : Visit + The visit whose outputs need to be exported. + exposure_ids : `set` [`int`] + Identifiers of the exposures that were processed. + """ + # TODO: this method will not be responsible for raws after DM-36051. + self._export_subset(visit, exposure_ids, "raw", + in_collections=self._get_raw_run_name(visit), + out_collection=self.instrument.makeDefaultRawIngestRunName()) + umbrella = self.instrument.makeCollectionName("prompt-results") + self._export_subset(visit, exposure_ids, + # TODO: find a way to merge datasets like *_config + # or *_schema that are duplicated across multiple + # workers. + self._get_safe_dataset_types(self.butler), + self.output_collection + "/*", # exclude inputs + umbrella) + _log.info(f"Pipeline products saved to collection '{umbrella}' for " + f"detector {visit.detector} of {exposure_ids}.") + + @staticmethod + def _get_safe_dataset_types(butler): + """Return the set of dataset types that can be safely merged from a worker. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The butler in which to search for dataset types. + + Returns + ------- + types : iterable [`str`] + The dataset types to return. + """ + return [dstype for dstype in butler.registry.queryDatasetTypes(...) + if "detector" in dstype.dimensions] + + def _export_subset(self, visit: Visit, exposure_ids: set[int], + dataset_types: typing.Any, in_collections: typing.Any, out_collection: str) -> None: + """Copy datasets associated with a processing run back to the + central Butler. + + Parameters + ---------- + visit : Visit + The visit whose outputs need to be exported. + exposure_ids : `set` [`int`] + Identifiers of the exposures that were processed. + dataset_types + The dataset type(s) to transfer; can be any expression described in + :ref:`daf_butler_dataset_type_expressions`. + in_collections + The collections to transfer from; can be any expression described + in :ref:`daf_butler_collection_expressions`. + out_collection : `str` + The chained collection in which to include the datasets. Need not + exist before the call. + """ + try: + # Need to iterate over datasets at least twice, so list. + datasets = list(self.butler.registry.queryDatasets( + dataset_types, + collections=in_collections, + # in_collections may include other runs, so need to filter. + # Since AP processing is strictly visit-detector, these three + # dimensions should suffice. + # DO NOT assume that visit == exposure! + where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})", + instrument=self.instrument.getName(), + detector=visit.detector, + )) + except lsst.daf.butler.registry.DataIdError as e: + raise ValueError("Invalid visit or exposures.") from e + if not datasets: + raise ValueError(f"No datasets match visit={visit} and exposures={exposure_ids}.") + + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: + # MUST NOT export governor dimensions, as this causes deadlocks in + # central registry. Can omit most other dimensions (all dimensions, + # after DM-36051) to avoid locks or redundant work. + # TODO: saveDatasets(elements={"exposure", "visit"}) doesn't work. + # Use import(skip_dimensions) until DM-36062 is fixed. + with self.butler.export(filename=export_file.name) as export: + export.saveDatasets(datasets) + self.central_butler.import_(filename=export_file.name, + directory=self.butler.datastore.root, + skip_dimensions={"instrument", "detector", + "skymap", "tract", "patch"}, + transfer="copy") + # No-op if collection already exists. + self.central_butler.registry.registerCollection(out_collection, CollectionType.CHAINED) + runs = {ref.run for ref in datasets} + # Don't unlink any previous runs. + # TODO: need to secure this against concurrent modification + _prepend_collection(self.central_butler, out_collection, runs) + def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, *args, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index d3a00947..114b1364 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -30,7 +30,9 @@ import astropy.units as u import astro_metadata_translator +import lsst.afw.image from lsst.daf.butler import Butler, CollectionType, DataCoordinate +import lsst.daf.butler.tests as butler_tests from lsst.obs.base.formatters.fitsExposure import FitsImageFormatter from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData import lsst.resources @@ -452,12 +454,15 @@ def _create_copied_repo(self): central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") def setUp(self): + # TODO: test two parallel repos once DM-36051 fixed; can't do it + # earlier because the test data has only one raw. + self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, skymap="deepCoadd_skyMap", collections=[f"{instname}/defaults"], - writeable=False) + writeable=True) instrument = "lsst.obs.decam.DarkEnergyCamera" data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") @@ -495,5 +500,50 @@ def setUp(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) self.interface.define_visits.run([self.raw_data_id]) + + # Simulate pipeline execution. + exp = lsst.afw.image.ExposureF(20, 20) + run = self.interface._prep_collections() + self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()} + # Dataset types defined for local Butler on pipeline run, but no + # guarantee this happens in central Butler. + butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, + "ExposureF") + self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run) + + def _check_datasets(self, butler, types, collections, count, data_id): + datasets = list(butler.registry.queryDatasets(types, collections=collections)) + self.assertEqual(len(datasets), count) + for dataset in datasets: + self.assertEqual(dataset.dataId, data_id) + + def test_export_outputs(self): + self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) + + central_butler = Butler(self.central_repo.name, writeable=False) + raw_collection = f"{instname}/raw/all" + export_collection = f"{instname}/prompt-results" + self._check_datasets(central_butler, + "raw", raw_collection, 1, self.raw_data_id) + # Did not export raws directly to raw/all. + self.assertNotEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.RUN) + self._check_datasets(central_butler, + "calexp", export_collection, 1, self.processed_data_id) + # Did not export calibs or other inputs. + self._check_datasets(central_butler, + ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection, + 0, {"error": "dnc"}) + # Nothing placed in "input" collections. + self._check_datasets(central_butler, + ["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"}) + + def test_export_outputs_bad_visit(self): + bad_visit = dataclasses.replace(self.next_visit, detector=88) + with self.assertRaises(ValueError): + self.interface.export_outputs(bad_visit, {self.raw_data_id["exposure"]}) + + def test_export_outputs_bad_exposure(self): + with self.assertRaises(ValueError): + self.interface.export_outputs(self.next_visit, {88}) From 14e89ad5f4ef6097eb44c8a43c13fc9823d09ada Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Aug 2022 11:12:20 -0700 Subject: [PATCH 15/20] Call export_outputs in main pipeline sequence. At present, export_outputs is called only if the pipeline run succeeded. More generally, it should be called if the processing will *not* be retried, i.e., on success or on permanent failure. --- python/activator/activator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/activator/activator.py b/python/activator/activator.py index 3f3fbe4c..9480d499 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -227,6 +227,9 @@ def next_visit_handler() -> Tuple[str, int]: _log.warning(f"Processing {len(expid_set)} snaps, expected {expected_visit.snaps}.") _log.info(f"Running pipeline on {expected_visit}.") mwi.run_pipeline(expected_visit, expid_set) + # TODO: broadcast alerts here + # TODO: call export_outputs on success or permanent failure in DM-34141 + mwi.export_outputs(expected_visit, expid_set) return "Pipeline executed", 200 else: _log.error(f"Timed out waiting for images for {expected_visit}.") From 0e59a9394826c222905d982444b22673e75d0a62 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 24 Aug 2022 15:56:20 -0700 Subject: [PATCH 16/20] Clarify use of timestamped collections for output runs. A previous commit had tried to replace these with group-specific collections, but this caused a rare bug when the same worker was used to process two detectors from the same group. The explanatory comment should prevent future developers from making the same mistake. --- python/activator/middleware_interface.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 6ecb29b1..88d226fd 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -468,6 +468,12 @@ def _prep_collections(self): # 'refcats'], # output=self.output_collection) # The below is taken from SimplePipelineExecutor.prep_butler. + # TODO: currently the run **must** be unique for each unit of + # processing. It must be unique per group because in the prototype the + # same exposures may be rerun under different group IDs, and they must + # be unique per detector because the same worker may be tasked with + # different detectors from the same group. Replace with a single + # universal run on DM-36586. output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" self.butler.registry.registerCollection(output_run, CollectionType.RUN) _prepend_collection(self.butler, self.output_collection, [output_run]) From 52013788670c942d0d1e5d2b81c4b8378aa423ab Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 1 Sep 2022 13:52:49 -0700 Subject: [PATCH 17/20] Fix deprecation warning from refObjLoader. The `ref_dataset_name` config field is no longer needed nor used. --- pipelines/calibrate.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/calibrate.py b/pipelines/calibrate.py index 4a580681..1e3d7b97 100644 --- a/pipelines/calibrate.py +++ b/pipelines/calibrate.py @@ -6,10 +6,8 @@ # band with the most depth). config.connections.astromRefCat = "gaia" -config.astromRefObjLoader.ref_dataset_name = config.connections.astromRefCat config.astromRefObjLoader.anyFilterMapsToThis = "phot_g_mean" config.astromRefObjLoader.filterMap = {} # Use panstarrs for photometry (grizy filters). config.connections.photoRefCat = "panstarrs" -config.photoRefObjLoader.ref_dataset_name = config.connections.photoRefCat From fea4d44ca1ef86e22cd5d5c25b34504002ae6873 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 6 Sep 2022 15:23:50 -0700 Subject: [PATCH 18/20] Use a single Butler object in MiddlewareInterface. Replacing the Butler was previously necessary because default dimensions are initialized only on Butler construction. The removal of default dimensions makes this no longer necessary, and having a single Butler object associated with MiddlewareInterface is easier to think about and makes repository behavior more predictable. --- python/activator/middleware_interface.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 88d226fd..59dd0c61 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -292,11 +292,8 @@ def prep_butler(self, visit: Visit) -> None: self.butler.registry.registerCollection(input_raws, CollectionType.RUN) _prepend_collection(self.butler, self.instrument.makeDefaultRawIngestRunName(), [input_raws]) - # TODO: Until DM-35941, need to create a new butler to update governor - # dimensions; refresh isn't enough. - self.butler = Butler(butler=self.butler, - collections=[self.output_collection], - ) + # Update for new collections, dimensions, and datasets. + self.butler.registry.refresh() def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. From 14a0e53f0a5fb7066c8372a2d46925710d3a6c25 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 6 Sep 2022 15:29:16 -0700 Subject: [PATCH 19/20] Remove internal use of Registry.refresh. Now that a single Butler object is being used throughout, these refreshes should no longer be necessary. --- python/activator/middleware_interface.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 59dd0c61..921c365f 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -168,9 +168,10 @@ def _init_local_butler(self, butler: Butler): ] butler.registry.setCollectionChain(self.output_collection, collections) - # Refresh butler after configuring it, to ensure all required - # dimensions and collections are available. - butler.registry.refresh() + # Internal Butler keeps a reference to the newly prepared collection. + # This reference makes visible any inputs for query purposes. Output + # runs are execution-specific and must be provided explicitly to the + # appropriate calls. self.butler = Butler(butler=butler, collections=[self.output_collection], ) @@ -266,9 +267,6 @@ def prep_butler(self, visit: Visit) -> None: wcs = self._predict_wcs(detector, visit) center, radius = self._detector_bounding_circle(detector, wcs) - # Need up-to-date census of what's already present. - self.butler.registry.refresh() - with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) @@ -292,9 +290,6 @@ def prep_butler(self, visit: Visit) -> None: self.butler.registry.registerCollection(input_raws, CollectionType.RUN) _prepend_collection(self.butler, self.instrument.makeDefaultRawIngestRunName(), [input_raws]) - # Update for new collections, dimensions, and datasets. - self.butler.registry.refresh() - def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. From 507b924dee2953e34edc930d9450d4e5f50433c6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 10 Oct 2022 15:19:45 -0700 Subject: [PATCH 20/20] Use daily instead of weekly in main container builds. This change lets us respond faster to upstream bugfixes in Science Pipelines. --- Dockerfile.main | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.main b/Dockerfile.main index 31f7a8d3..f38c017c 100644 --- a/Dockerfile.main +++ b/Dockerfile.main @@ -1,4 +1,4 @@ -FROM lsstsqre/centos:w_latest +FROM lsstsqre/centos:d_latest ENV PYTHONUNBUFFERED True RUN source /opt/lsst/software/stack/loadLSST.bash \ && mamba install -y \