From cb5a3c88d0def025dfb2bd5f2f45d3d80b4ce99f Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 28 Mar 2023 13:50:54 -0700 Subject: [PATCH 1/4] Factor local repo initialization into an external function. The make_local_repo function makes it possible to decouple the lifetime of the local repo from the lifetime of individual MiddlewareInterface objects, allowing more flexibility in how the latter are created. --- python/activator/middleware_interface.py | 71 +++++++++++++++++------- tests/test_middleware_interface.py | 13 ++++- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 749770e8..765af3ed 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -19,7 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["get_central_butler", "MiddlewareInterface"] +__all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"] import collections.abc import datetime @@ -76,6 +76,53 @@ def get_central_butler(central_repo: str, instrument_class: str): ) +def make_local_repo(local_storage: str, central_butler: Butler, instrument: str): + """Create and configure a new local repository. + + The repository is represented by a temporary directory object, which can be + used to manage its lifetime. + + Parameters + ---------- + local_storage : `str` + An absolute path to a space where this function can create a local + Butler repo. + central_butler : `lsst.daf.butler.Butler` + Butler repo containing instrument and skymap definitions. + instrument : `str` + Name of the instrument taking the data, for populating + butler collections and dataIds. May be either the fully qualified class + name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam". + + Returns + ------- + repo_dir + An object of the same type as returned by `tempfile.TemporaryDirectory`, + pointing to the local repo location. + """ + repo_dir = tempfile.TemporaryDirectory(dir=local_storage, prefix="butler-") + butler = Butler(Butler.makeRepo(repo_dir.name), writeable=True) + _log.info("Created local Butler repo at %s.", repo_dir.name) + + # Run-once repository initialization + + instrument = lsst.obs.base.Instrument.from_string(instrument, central_butler.registry) + instrument.register(butler.registry) + + butler.registry.registerCollection(instrument.makeUmbrellaCollectionName(), + CollectionType.CHAINED) + butler.registry.registerCollection(instrument.makeDefaultRawIngestRunName(), + CollectionType.RUN) + output_collection = instrument.makeCollectionName("prompt") + butler.registry.registerCollection(output_collection, CollectionType.CHAINED) + collections = [instrument.makeUmbrellaCollectionName(), + instrument.makeDefaultRawIngestRunName(), + ] + butler.registry.setCollectionChain(output_collection, collections) + + return repo_dir + + class MiddlewareInterface: """Interface layer between the Butler middleware and the prompt processing data handling system, to handle processing individual images. @@ -219,31 +266,15 @@ def _init_local_butler(self, base_path: str): An absolute path to a space where the repo can be created. """ # Directory has same lifetime as this object. - self._repo = tempfile.TemporaryDirectory(dir=base_path, prefix="butler-") - butler = Butler(Butler.makeRepo(self._repo.name), writeable=True) - _log.info("Created local Butler repo at %s.", self._repo.name) - - self.instrument.register(butler.registry) - - # Will be populated in prep_butler. - butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(), - CollectionType.CHAINED) - # Will be populated on ingest. - butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), - CollectionType.RUN) - # Will be populated on pipeline execution. - butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) - collections = [self.instrument.makeUmbrellaCollectionName(), - self.instrument.makeDefaultRawIngestRunName(), - ] - butler.registry.setCollectionChain(self.output_collection, collections) + self._repo = make_local_repo(base_path, self.central_butler, self.instrument.getName()) # Internal Butler keeps a reference to the newly prepared collection. # This reference makes visible any inputs for query purposes. Output # runs are execution-specific and must be provided explicitly to the # appropriate calls. - self.butler = Butler(butler=butler, + self.butler = Butler(self._repo.name, collections=[self.output_collection], + writeable=True, ) def _init_ingester(self): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 4cc58360..bc06b59c 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -41,7 +41,7 @@ import lsst.resources from activator.visit import Visit -from activator.middleware_interface import get_central_butler, MiddlewareInterface, \ +from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \ _filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError # The short name of the instrument used in the test repo. @@ -178,6 +178,17 @@ def test_get_butler(self): self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) self.assertTrue(butler.isWriteable()) + def test_make_local_repo(self): + for inst in [instname, "lsst.obs.decam.DarkEnergyCamera"]: + with make_local_repo(self.workspace.name, Butler(self.central_repo), inst) as repo_dir: + self.assertTrue(os.path.exists(repo_dir)) + butler = Butler(repo_dir) + self.assertEqual([x.dataId for x in butler.registry.queryDimensionRecords("instrument")], + [DataCoordinate.standardize({"instrument": instname}, + universe=butler.dimensions)]) + self.assertIn(f"{instname}/defaults", butler.registry.queryCollections()) + self.assertFalse(os.path.exists(repo_dir)) + def test_init(self): """Basic tests of the initialized interface object. """ From 8081a825bd0a87d800839b9437313ced4dacb996 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 28 Mar 2023 15:16:11 -0700 Subject: [PATCH 2/4] Define local repo outside MiddlewareInterface object. This change makes it possible for multiple MiddlewareInterface objects to be defined, in order, for the same local repo. Since MiddlewareInterface no longer guarantees that it has exclusive access to its repo, I've added a pre-emptive refresh call before any operations that access it. --- python/activator/activator.py | 9 +++-- python/activator/middleware_interface.py | 47 ++++++++++++------------ tests/test_middleware_interface.py | 18 ++++----- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 3b2795f8..dfd8370d 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -35,7 +35,7 @@ from .logger import setup_usdf_logger from .make_pgpass import make_pgpass -from .middleware_interface import get_central_butler, MiddlewareInterface +from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface from .raw import Snap from .visit import Visit @@ -84,12 +84,15 @@ storage_client = boto3.client('s3', endpoint_url=s3_endpoint) +central_butler = get_central_butler(calib_repo, instrument_name) +# local_repo is a temporary directory with the same lifetime as this process. +local_repo = make_local_repo(local_repos, central_butler, instrument_name) # Initialize middleware interface. -mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), +mwi = MiddlewareInterface(central_butler, image_bucket, instrument_name, skymap, - local_repos) + local_repo.name) def check_for_snap( diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 765af3ed..7f8d7ef6 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -138,10 +138,9 @@ class MiddlewareInterface: is no guarantee that a processing run may, or may not, share a group, detector, or both with a previous run handled by the same object. - ``MiddlewareInterface`` objects are not thread- or process-safe, and must - not share any state with other instances (see ``butler`` in the parameter - list). They may, however, share a ``central_butler``, and concurrent - operations on this butler are guaranteed to be appropriately synchronized. + ``MiddlewareInterface`` objects are not thread- or process-safe. It is up + to the client to avoid conflicts from multiple objects trying to access the + same local repo. Parameters ---------- @@ -160,9 +159,9 @@ class MiddlewareInterface: use of the butler. skymap: `str` Name of the skymap in the central repo for querying templates. - local_storage : `str` - An absolute path to a space where this object can create a local - Butler repo. The repo is guaranteed to be unique to this object. + local_repo : `str` + A URI to the local Butler repo, which is assumed to already exist and + contain standard collections and the registration of ``instrument``. prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is @@ -179,15 +178,15 @@ class MiddlewareInterface: # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, self.skymap, self._deployment do not change # after __init__. - # self._repo is the only reference to its TemporaryDirectory object. # self.butler defaults to a chained collection named - # self.output_collection, which contains zero or more output runs, - # pre-made inputs, and raws, in that order. However, self.butler is not + # self.output_collection, which contains zero or more output runs + # and all pipeline inputs, in that order. However, self.butler is not # guaranteed to contain concrete data, or even the dimensions - # corresponding to self.camera and self.skymap. + # corresponding to self.camera and self.skymap. Do not assume that + # self.butler is the only Butler pointing to the local repo. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, - skymap: str, local_storage: str, + skymap: str, local_repo: str, prefix: str = "s3://"): # Deployment/version ID -- potentially expensive to generate. self._deployment = self._get_deployment() @@ -205,7 +204,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self.output_collection = self.instrument.makeCollectionName("prompt") - self._init_local_butler(local_storage) + self._init_local_butler(local_repo) self._init_ingester() self._init_visit_definer() @@ -253,26 +252,22 @@ def _make_apdb_uri(self): user_apdb = os.environ.get("USER_APDB", "postgres") return f"postgresql://{user_apdb}@{ip_apdb}/{db_apdb}" - def _init_local_butler(self, base_path: str): + def _init_local_butler(self, repo_uri: str): """Prepare the local butler to ingest into and process from. ``self.instrument`` must already exist. ``self.butler`` is correctly - initialized after this method returns, and is guaranteed to be unique - to this object. + initialized after this method returns. Parameters ---------- - base_path : `str` - An absolute path to a space where the repo can be created. + repo_uri : `str` + A URI to the location of the local repository. """ - # Directory has same lifetime as this object. - self._repo = make_local_repo(base_path, self.central_butler, self.instrument.getName()) - # Internal Butler keeps a reference to the newly prepared collection. # This reference makes visible any inputs for query purposes. Output # runs are execution-specific and must be provided explicitly to the # appropriate calls. - self.butler = Butler(self._repo.name, + self.butler = Butler(repo_uri, collections=[self.output_collection], writeable=True, ) @@ -389,9 +384,10 @@ def prep_butler(self, visit: Visit) -> None: wcs = self._predict_wcs(detector, visit) center, radius = self._detector_bounding_circle(detector, wcs) - # central repo may have been modified by other MWI instances. + # repos may have been modified by other MWI instances. # TODO: get a proper synchronization API for Butler self.central_butler.registry.refresh() + self.butler.registry.refresh() with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: @@ -638,6 +634,7 @@ def _prep_collections(self, visit: Visit): is prefixed by ``self.output_collection``. """ output_run = self._get_output_run(visit) + self.butler.registry.refresh() self.butler.registry.registerCollection(output_run, CollectionType.RUN) # As of Feb 2023, this moves output_run to the front of the chain if # it's already present, but this behavior cannot be relied upon. @@ -856,6 +853,9 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], The collections to transfer from; can be any expression described in :ref:`daf_butler_collection_expressions`. """ + # local repo may have been modified by other MWI instances. + self.butler.registry.refresh() + try: # Need to iterate over datasets at least twice, so list. datasets = list(self.butler.registry.queryDatasets( @@ -904,6 +904,7 @@ def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None: exposure_ids : `set` [`int`] Identifiers of the exposures to be removed. """ + self.butler.registry.refresh() raws = self.butler.registry.queryDatasets( 'raw', collections=self.instrument.makeDefaultRawIngestRunName(), diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index bc06b59c..d9aedab2 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -133,9 +133,9 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. - self.workspace = tempfile.TemporaryDirectory() + self.local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, - skymap_name, self.workspace.name, + skymap_name, self.local_repo.name, prefix="file://") # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 @@ -164,8 +164,7 @@ def setUp(self): def tearDown(self): super().tearDown() # TemporaryDirectory warns on leaks - self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method? - self.workspace.cleanup() + self.local_repo.cleanup() def test_get_butler(self): for butler in [get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera"), @@ -180,7 +179,7 @@ def test_get_butler(self): def test_make_local_repo(self): for inst in [instname, "lsst.obs.decam.DarkEnergyCamera"]: - with make_local_repo(self.workspace.name, Butler(self.central_repo), inst) as repo_dir: + with make_local_repo(tempfile.gettempdir(), Butler(self.central_repo), inst) as repo_dir: self.assertTrue(os.path.exists(repo_dir)) butler = Butler(repo_dir) self.assertEqual([x.dataId for x in butler.registry.queryDimensionRecords("instrument")], @@ -611,10 +610,11 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") - workspace = tempfile.TemporaryDirectory() + + local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. - self.addCleanup(tempfile.TemporaryDirectory.cleanup, workspace) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -642,10 +642,8 @@ def setUp(self): # Populate repository. self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, - skymap_name, workspace.name, + skymap_name, local_repo.name, prefix="file://") - # TODO: should MiddlewareInterface have a cleanup method? - self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo) self.interface.prep_butler(self.next_visit) filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) From 92ce8cbffc69efc583f900a47d52db5dba5fe9c4 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 28 Mar 2023 15:48:03 -0700 Subject: [PATCH 3/4] Make MiddlewareInterface per-visit, not per-process. Using a new object for each visit makes it possible to start processing from an (almost) clean slate without needing to carefully manage object state. Since the local repo is shared between objects, we still get the benefits of local dataset caching. --- python/activator/activator.py | 13 +++++++------ python/activator/middleware_interface.py | 7 +++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index dfd8370d..50b03630 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -87,12 +87,6 @@ central_butler = get_central_butler(calib_repo, instrument_name) # local_repo is a temporary directory with the same lifetime as this process. local_repo = make_local_repo(local_repos, central_butler, instrument_name) -# Initialize middleware interface. -mwi = MiddlewareInterface(central_butler, - image_bucket, - instrument_name, - skymap, - local_repo.name) def check_for_snap( @@ -235,6 +229,13 @@ def next_visit_handler() -> Tuple[str, int]: f"Expected {instrument_name}, received {expected_visit.instrument}." expid_set = set() + # Create a fresh MiddlewareInterface object to avoid accidental + # "cross-talk" between different visits. + mwi = MiddlewareInterface(central_butler, + image_bucket, + instrument_name, + skymap, + local_repo.name) # Copy calibrations for this detector/visit mwi.prep_butler(expected_visit) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 7f8d7ef6..f48a290b 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -133,10 +133,9 @@ class MiddlewareInterface: ingest the data when it is available, and run the difference imaging pipeline, all in that local butler. - Each instance may be used for processing more than one group-detector - combination, designated by the `Visit` parameter to certain methods. There - is no guarantee that a processing run may, or may not, share a group, - detector, or both with a previous run handled by the same object. + Each instance must be used for processing only one group-detector + combination. The object may contain state that is unique to a particular + processing run. ``MiddlewareInterface`` objects are not thread- or process-safe. It is up to the client to avoid conflicts from multiple objects trying to access the From 8ab8cbdfc8a760543913c9b470310e2072d3a684 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 6 Apr 2023 14:46:33 -0700 Subject: [PATCH 4/4] Replace SimplePipelineExecutor with SeperablePipelineExecutor. SeparablePipelineExecutor is configured to give equivalent behavior to the old use of SimplePipelineExecutor. Actually taking advantage of its new capabilities will be done on later tickets. --- python/activator/middleware_interface.py | 16 ++++++++-------- tests/test_middleware_interface.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index f48a290b..1c858996 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -34,7 +34,7 @@ from lsst.utils import getPackageDir from lsst.resources import ResourcePath import lsst.afw.cameraGeom -from lsst.ctrl.mpexec import SimplePipelineExecutor +from lsst.ctrl.mpexec import SeparablePipelineExecutor from lsst.daf.butler import Butler, CollectionType import lsst.geom from lsst.meas.algorithms.htmIndexer import HtmIndexer @@ -777,17 +777,17 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: output_run_butler = Butler(butler=self.butler, collections=(self._get_init_output_run(visit), ) + self.butler.collections, run=output_run) - executor = SimplePipelineExecutor.from_pipeline(pipeline, - where=where, - butler=output_run_butler) - if len(executor.quantum_graph) == 0: + executor = SeparablePipelineExecutor(output_run_butler, clobber_output=False, skip_existing_in=None) + qgraph = executor.make_quantum_graph(pipeline, where=where) + if len(qgraph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") - _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. - result = executor.run(register_dataset_types=True) - _log.info(f"Pipeline successfully run on {len(result)} quanta for " + executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) + _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") + executor.run_pipeline(qgraph) + _log.info(f"Pipeline successfully run on " f"detector {visit.detector} of {exposure_ids}.") def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index d9aedab2..a059f24d 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -358,10 +358,17 @@ def test_run_pipeline(self): mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) - with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run: + with unittest.mock.patch( + "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph") \ + as mock_preexec, \ + unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \ + as mock_run: with self.assertLogs(self.logger_name, level="INFO") as logs: self.interface.run_pipeline(self.next_visit, {1}) - mock_run.assert_called_once_with(register_dataset_types=True) + mock_preexec.assert_called_once() + # Pre-execution may have other arguments as needed; no requirement either way. + self.assertEqual(mock_preexec.call_args.kwargs["register_dataset_types"], True) + mock_run.assert_called_once() # Check that we configured the right pipeline. self.assertIn("End to end Alert Production pipeline specialized for HiTS-2015", "\n".join(logs.output))