diff --git a/python/activator/activator.py b/python/activator/activator.py index 3b2795f8..50b03630 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -35,7 +35,7 @@ from .logger import setup_usdf_logger from .make_pgpass import make_pgpass -from .middleware_interface import get_central_butler, MiddlewareInterface +from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface from .raw import Snap from .visit import Visit @@ -84,12 +84,9 @@ storage_client = boto3.client('s3', endpoint_url=s3_endpoint) -# Initialize middleware interface. -mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name), - image_bucket, - instrument_name, - skymap, - local_repos) +central_butler = get_central_butler(calib_repo, instrument_name) +# local_repo is a temporary directory with the same lifetime as this process. +local_repo = make_local_repo(local_repos, central_butler, instrument_name) def check_for_snap( @@ -232,6 +229,13 @@ def next_visit_handler() -> Tuple[str, int]: f"Expected {instrument_name}, received {expected_visit.instrument}." expid_set = set() + # Create a fresh MiddlewareInterface object to avoid accidental + # "cross-talk" between different visits. + mwi = MiddlewareInterface(central_butler, + image_bucket, + instrument_name, + skymap, + local_repo.name) # Copy calibrations for this detector/visit mwi.prep_butler(expected_visit) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 749770e8..1c858996 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -19,7 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["get_central_butler", "MiddlewareInterface"] +__all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"] import collections.abc import datetime @@ -34,7 +34,7 @@ from lsst.utils import getPackageDir from lsst.resources import ResourcePath import lsst.afw.cameraGeom -from lsst.ctrl.mpexec import SimplePipelineExecutor +from lsst.ctrl.mpexec import SeparablePipelineExecutor from lsst.daf.butler import Butler, CollectionType import lsst.geom from lsst.meas.algorithms.htmIndexer import HtmIndexer @@ -76,6 +76,53 @@ def get_central_butler(central_repo: str, instrument_class: str): ) +def make_local_repo(local_storage: str, central_butler: Butler, instrument: str): + """Create and configure a new local repository. + + The repository is represented by a temporary directory object, which can be + used to manage its lifetime. + + Parameters + ---------- + local_storage : `str` + An absolute path to a space where this function can create a local + Butler repo. + central_butler : `lsst.daf.butler.Butler` + Butler repo containing instrument and skymap definitions. + instrument : `str` + Name of the instrument taking the data, for populating + butler collections and dataIds. May be either the fully qualified class + name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam". + + Returns + ------- + repo_dir + An object of the same type as returned by `tempfile.TemporaryDirectory`, + pointing to the local repo location. + """ + repo_dir = tempfile.TemporaryDirectory(dir=local_storage, prefix="butler-") + butler = Butler(Butler.makeRepo(repo_dir.name), writeable=True) + _log.info("Created local Butler repo at %s.", repo_dir.name) + + # Run-once repository initialization + + instrument = lsst.obs.base.Instrument.from_string(instrument, central_butler.registry) + instrument.register(butler.registry) + + butler.registry.registerCollection(instrument.makeUmbrellaCollectionName(), + CollectionType.CHAINED) + butler.registry.registerCollection(instrument.makeDefaultRawIngestRunName(), + CollectionType.RUN) + output_collection = instrument.makeCollectionName("prompt") + butler.registry.registerCollection(output_collection, CollectionType.CHAINED) + collections = [instrument.makeUmbrellaCollectionName(), + instrument.makeDefaultRawIngestRunName(), + ] + butler.registry.setCollectionChain(output_collection, collections) + + return repo_dir + + class MiddlewareInterface: """Interface layer between the Butler middleware and the prompt processing data handling system, to handle processing individual images. @@ -86,15 +133,13 @@ class MiddlewareInterface: ingest the data when it is available, and run the difference imaging pipeline, all in that local butler. - Each instance may be used for processing more than one group-detector - combination, designated by the `Visit` parameter to certain methods. There - is no guarantee that a processing run may, or may not, share a group, - detector, or both with a previous run handled by the same object. + Each instance must be used for processing only one group-detector + combination. The object may contain state that is unique to a particular + processing run. - ``MiddlewareInterface`` objects are not thread- or process-safe, and must - not share any state with other instances (see ``butler`` in the parameter - list). They may, however, share a ``central_butler``, and concurrent - operations on this butler are guaranteed to be appropriately synchronized. + ``MiddlewareInterface`` objects are not thread- or process-safe. It is up + to the client to avoid conflicts from multiple objects trying to access the + same local repo. Parameters ---------- @@ -113,9 +158,9 @@ class MiddlewareInterface: use of the butler. skymap: `str` Name of the skymap in the central repo for querying templates. - local_storage : `str` - An absolute path to a space where this object can create a local - Butler repo. The repo is guaranteed to be unique to this object. + local_repo : `str` + A URI to the local Butler repo, which is assumed to already exist and + contain standard collections and the registration of ``instrument``. prefix : `str`, optional URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is @@ -132,15 +177,15 @@ class MiddlewareInterface: # self._download_store is None if and only if self.image_host is a local URI. # self.instrument, self.camera, self.skymap, self._deployment do not change # after __init__. - # self._repo is the only reference to its TemporaryDirectory object. # self.butler defaults to a chained collection named - # self.output_collection, which contains zero or more output runs, - # pre-made inputs, and raws, in that order. However, self.butler is not + # self.output_collection, which contains zero or more output runs + # and all pipeline inputs, in that order. However, self.butler is not # guaranteed to contain concrete data, or even the dimensions - # corresponding to self.camera and self.skymap. + # corresponding to self.camera and self.skymap. Do not assume that + # self.butler is the only Butler pointing to the local repo. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, - skymap: str, local_storage: str, + skymap: str, local_repo: str, prefix: str = "s3://"): # Deployment/version ID -- potentially expensive to generate. self._deployment = self._get_deployment() @@ -158,7 +203,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self.output_collection = self.instrument.makeCollectionName("prompt") - self._init_local_butler(local_storage) + self._init_local_butler(local_repo) self._init_ingester() self._init_visit_definer() @@ -206,44 +251,24 @@ def _make_apdb_uri(self): user_apdb = os.environ.get("USER_APDB", "postgres") return f"postgresql://{user_apdb}@{ip_apdb}/{db_apdb}" - def _init_local_butler(self, base_path: str): + def _init_local_butler(self, repo_uri: str): """Prepare the local butler to ingest into and process from. ``self.instrument`` must already exist. ``self.butler`` is correctly - initialized after this method returns, and is guaranteed to be unique - to this object. + initialized after this method returns. Parameters ---------- - base_path : `str` - An absolute path to a space where the repo can be created. + repo_uri : `str` + A URI to the location of the local repository. """ - # Directory has same lifetime as this object. - self._repo = tempfile.TemporaryDirectory(dir=base_path, prefix="butler-") - butler = Butler(Butler.makeRepo(self._repo.name), writeable=True) - _log.info("Created local Butler repo at %s.", self._repo.name) - - self.instrument.register(butler.registry) - - # Will be populated in prep_butler. - butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(), - CollectionType.CHAINED) - # Will be populated on ingest. - butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), - CollectionType.RUN) - # Will be populated on pipeline execution. - butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) - collections = [self.instrument.makeUmbrellaCollectionName(), - self.instrument.makeDefaultRawIngestRunName(), - ] - butler.registry.setCollectionChain(self.output_collection, collections) - # Internal Butler keeps a reference to the newly prepared collection. # This reference makes visible any inputs for query purposes. Output # runs are execution-specific and must be provided explicitly to the # appropriate calls. - self.butler = Butler(butler=butler, + self.butler = Butler(repo_uri, collections=[self.output_collection], + writeable=True, ) def _init_ingester(self): @@ -358,9 +383,10 @@ def prep_butler(self, visit: Visit) -> None: wcs = self._predict_wcs(detector, visit) center, radius = self._detector_bounding_circle(detector, wcs) - # central repo may have been modified by other MWI instances. + # repos may have been modified by other MWI instances. # TODO: get a proper synchronization API for Butler self.central_butler.registry.refresh() + self.butler.registry.refresh() with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: @@ -607,6 +633,7 @@ def _prep_collections(self, visit: Visit): is prefixed by ``self.output_collection``. """ output_run = self._get_output_run(visit) + self.butler.registry.refresh() self.butler.registry.registerCollection(output_run, CollectionType.RUN) # As of Feb 2023, this moves output_run to the front of the chain if # it's already present, but this behavior cannot be relied upon. @@ -750,17 +777,17 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: output_run_butler = Butler(butler=self.butler, collections=(self._get_init_output_run(visit), ) + self.butler.collections, run=output_run) - executor = SimplePipelineExecutor.from_pipeline(pipeline, - where=where, - butler=output_run_butler) - if len(executor.quantum_graph) == 0: + executor = SeparablePipelineExecutor(output_run_butler, clobber_output=False, skip_existing_in=None) + qgraph = executor.make_quantum_graph(pipeline, where=where) + if len(qgraph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") - _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. - result = executor.run(register_dataset_types=True) - _log.info(f"Pipeline successfully run on {len(result)} quanta for " + executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) + _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") + executor.run_pipeline(qgraph) + _log.info(f"Pipeline successfully run on " f"detector {visit.detector} of {exposure_ids}.") def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: @@ -825,6 +852,9 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], The collections to transfer from; can be any expression described in :ref:`daf_butler_collection_expressions`. """ + # local repo may have been modified by other MWI instances. + self.butler.registry.refresh() + try: # Need to iterate over datasets at least twice, so list. datasets = list(self.butler.registry.queryDatasets( @@ -873,6 +903,7 @@ def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None: exposure_ids : `set` [`int`] Identifiers of the exposures to be removed. """ + self.butler.registry.refresh() raws = self.butler.registry.queryDatasets( 'raw', collections=self.instrument.makeDefaultRawIngestRunName(), diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 4cc58360..a059f24d 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -41,7 +41,7 @@ import lsst.resources from activator.visit import Visit -from activator.middleware_interface import get_central_butler, MiddlewareInterface, \ +from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \ _filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError # The short name of the instrument used in the test repo. @@ -133,9 +133,9 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" self.input_data = os.path.join(data_dir, "input_data") # Have to preserve the tempdir, so that it doesn't get cleaned up. - self.workspace = tempfile.TemporaryDirectory() + self.local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, - skymap_name, self.workspace.name, + skymap_name, self.local_repo.name, prefix="file://") # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 @@ -164,8 +164,7 @@ def setUp(self): def tearDown(self): super().tearDown() # TemporaryDirectory warns on leaks - self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method? - self.workspace.cleanup() + self.local_repo.cleanup() def test_get_butler(self): for butler in [get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera"), @@ -178,6 +177,17 @@ def test_get_butler(self): self.assertEqual(list(butler.collections), [f"{instname}/defaults"]) self.assertTrue(butler.isWriteable()) + def test_make_local_repo(self): + for inst in [instname, "lsst.obs.decam.DarkEnergyCamera"]: + with make_local_repo(tempfile.gettempdir(), Butler(self.central_repo), inst) as repo_dir: + self.assertTrue(os.path.exists(repo_dir)) + butler = Butler(repo_dir) + self.assertEqual([x.dataId for x in butler.registry.queryDimensionRecords("instrument")], + [DataCoordinate.standardize({"instrument": instname}, + universe=butler.dimensions)]) + self.assertIn(f"{instname}/defaults", butler.registry.queryCollections()) + self.assertFalse(os.path.exists(repo_dir)) + def test_init(self): """Basic tests of the initialized interface object. """ @@ -348,10 +358,17 @@ def test_run_pipeline(self): mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) - with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run: + with unittest.mock.patch( + "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph") \ + as mock_preexec, \ + unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \ + as mock_run: with self.assertLogs(self.logger_name, level="INFO") as logs: self.interface.run_pipeline(self.next_visit, {1}) - mock_run.assert_called_once_with(register_dataset_types=True) + mock_preexec.assert_called_once() + # Pre-execution may have other arguments as needed; no requirement either way. + self.assertEqual(mock_preexec.call_args.kwargs["register_dataset_types"], True) + mock_run.assert_called_once() # Check that we configured the right pipeline. self.assertIn("End to end Alert Production pipeline specialized for HiTS-2015", "\n".join(logs.output)) @@ -600,10 +617,11 @@ def setUp(self): instrument = "lsst.obs.decam.DarkEnergyCamera" data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") - workspace = tempfile.TemporaryDirectory() + + local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. - self.addCleanup(tempfile.TemporaryDirectory.cleanup, workspace) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -631,10 +649,8 @@ def setUp(self): # Populate repository. self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, - skymap_name, workspace.name, + skymap_name, local_repo.name, prefix="file://") - # TODO: should MiddlewareInterface have a cleanup method? - self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo) self.interface.prep_butler(self.next_visit) filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename)