diff --git a/doc/playbook.rst b/doc/playbook.rst index 5f42e036..daf0d196 100644 --- a/doc/playbook.rst +++ b/doc/playbook.rst @@ -99,8 +99,8 @@ The bucket ``rubin-pp`` holds incoming raw images. The bucket ``rubin-pp-users`` holds: -* The central repository described in `DMTN-219`_. - This repository currently contains a copy of HSC data `ap_verify_ci_cosmos_pdr2/preloaded@u/kfindeisen/DM-35052-expansion `_. +* ``rubin-pp-users/central_repo/`` contains the central repository described in `DMTN-219`_. + This repository currently contains a copy of HSC RC2 data, uploaded with ``make_hsc_rc2_export.py`` and ``make_template_export``. * ``rubin-pp-users/unobserved/`` contains raw files that the upload script(s) can draw from to create incoming raws. @@ -200,7 +200,7 @@ USDF The service can be controlled with ``kubectl`` from ``rubin-devl``. You must first `get credentials for the development cluster `_ on the web; ignore the installation instructions and copy the commands from the second box. -Credentials are good for roughly one work day. +Credentials must be renewed if you get a "cannot fetch token: 400 Bad Request" error when running ``kubectl``. Each time the service container is updated, a new revision of the service should be edited and deployed. (Continuous deployment has not yet been set up.) @@ -320,11 +320,12 @@ Run: and look up the ``EXTERNAL-IP``; set ``KAFKA_CLUSTER=:9094``. The IP address is fixed, so you should only need to look it up once. -Install the prototype code: +Install the prototype code, and set it up before use: .. code-block:: sh git clone https://github.com/lsst-dm/prompt_prototype + setup -r prompt_prototype The tester scripts send ``next_visit`` events for each detector via Kafka on the ``next-visit-topic`` topic. They then upload a batch of files representing the snaps of the visit to the ``rubin-pp`` S3 bucket, simulating incoming raw images. diff --git a/python/activator/activator.py b/python/activator/activator.py index 0cf524c4..117f0462 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -296,6 +296,8 @@ def next_visit_handler() -> Tuple[str, int]: # TODO: broadcast alerts here # TODO: call export_outputs on success or permanent failure in DM-34141 mwi.export_outputs(expected_visit, expid_set) + # Clean only if export successful. + mwi.clean_local_repo(expected_visit, expid_set) return "Pipeline executed", 200 else: _log.error(f"Timed out waiting for images for {expected_visit}.") diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index cf2d0af4..19d3a42d 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -22,6 +22,8 @@ __all__ = ["get_central_butler", "MiddlewareInterface"] import collections.abc +import datetime +import hashlib import itertools import logging import os @@ -129,19 +131,20 @@ class MiddlewareInterface: # self._apdb_uri is a valid URI that unambiguously identifies the APDB # self.image_host is a valid URI with non-empty path and no query or fragment. # self._download_store is None if and only if self.image_host is a local URI. - # self.instrument, self.camera, and self.skymap do not change after __init__. + # self.instrument, self.camera, self.skymap, self._deployment do not change + # after __init__. # self._repo is the only reference to its TemporaryDirectory object. # self.butler defaults to a chained collection named # self.output_collection, which contains zero or more output runs, # pre-made inputs, and raws, in that order. However, self.butler is not # guaranteed to contain concrete data, or even the dimensions # corresponding to self.camera and self.skymap. - # if it exists, the latest run in self.output_collection is always the first - # in the chain. def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, local_storage: str, prefix: str = "s3://"): + # Deployment/version ID -- potentially expensive to generate. + self._deployment = self._get_deployment() self._apdb_uri = self._make_apdb_uri() self._apdb_namespace = os.environ.get("NAMESPACE_APDB", None) self.central_butler = central_butler @@ -176,6 +179,27 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, # How much to pad the refcat region we will copy over. self.padding = 30*lsst.geom.arcseconds + def _get_deployment(self): + """Get a unique version ID of the active stack and pipeline configuration(s). + + Returns + ------- + version : `str` + A unique version identifier for the stack. Contains only + word characters and "-", but not guaranteed to be human-readable. + """ + try: + # Defined by Knative in containers, guaranteed to be unique for + # each deployment. Currently of the form prompt-proto-service-#####. + return os.environ["K_REVISION"] + except KeyError: + # If not in a container, read the active Science Pipelines install. + packages = lsst.utils.packages.Packages.fromSystem() # Takes several seconds! + h = hashlib.md5(usedforsecurity=False) + for package, version in packages.items(): + h.update(bytes(package + version, encoding="utf-8")) + return f"local-{h.hexdigest()}" + def _make_apdb_uri(self): """Generate a URI for accessing the APDB. """ @@ -209,7 +233,7 @@ def _init_local_butler(self, base_path: str): CollectionType.CHAINED) # Will be populated on ingest. butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), - CollectionType.CHAINED) + CollectionType.RUN) # Will be populated on pipeline execution. butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) collections = [self.instrument.makeUmbrellaCollectionName(), @@ -357,14 +381,6 @@ def prep_butler(self, visit: Visit) -> None: directory=self.central_butler.datastore.root, transfer="copy") - # Create unique run to store this visit's raws. This makes it easier to - # clean up the central repo later. - # TODO: not needed after DM-36051, or if we find a way to give all test - # raws unique exposure IDs. - input_raws = self._get_raw_run_name(visit) - self.butler.registry.registerCollection(input_raws, CollectionType.RUN) - _prepend_collection(self.butler, self.instrument.makeDefaultRawIngestRunName(), [input_raws]) - def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. @@ -506,53 +522,95 @@ def get_key(ref): for k, g in itertools.groupby(ordered, key=get_key): yield k, len(list(g)) - def _get_raw_run_name(self, visit): - """Define a run collection specific to a particular visit. + def _get_init_output_run(self, + visit: Visit, + date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: + """Generate a deterministic init-output collection name that avoids + configuration conflicts. + + Parameters + ---------- + visit : Visit + Group of snaps whose processing goes into the run. + date : `datetime.date` + Date of the processing run (not observation!) + + Returns + ------- + run : `str` + The run in which to place pipeline init-outputs. + """ + # Current executor requires that init-outputs be in the same run as + # outputs. This can be changed once DM-36162 is done. + return self._get_output_run(visit, date) + + def _get_output_run(self, + visit: Visit, + date: datetime.date = datetime.datetime.now(datetime.timezone.utc)) -> str: + """Generate a deterministic collection name that avoids version or + provenance conflicts. Parameters ---------- visit : Visit - The visit whose raws will be stored in this run. + Group of snaps whose processing goes into the run. + date : `datetime.date` + Date of the processing run (not observation!) Returns ------- run : `str` - The name of a run collection to use for raws. + The run in which to place processing outputs. """ - return self.instrument.makeCollectionName("raw", visit.groupId) + pipeline_name, _ = os.path.splitext(os.path.basename(self._get_pipeline_file(visit))) + # Order optimized for S3 bucket -- filter out as many files as soon as possible. + return self.instrument.makeCollectionName( + "prompt", f"output-{date:%Y-%m-%d}", pipeline_name, self._deployment) - def _prep_collections(self): + def _prep_collections(self, visit: Visit): """Pre-register output collections in advance of running the pipeline. + Parameters + ---------- + visit : Visit + Group of snaps needing an output run. + Returns ------- run : `str` The name of a new run collection to use for outputs. The run name is prefixed by ``self.output_collection``. """ - # NOTE: Because we receive a butler on init, we can't use this - # prep_butler() because it takes a repo path. - # butler = SimplePipelineExecutor.prep_butler( - # self.repo, - # inputs=[self.calibration_collection, - # self.instrument.makeDefaultRawIngestRunName(), - # 'refcats'], - # output=self.output_collection) - # The below is taken from SimplePipelineExecutor.prep_butler. - # TODO: currently the run **must** be unique for each unit of - # processing. It must be unique per group because in the prototype the - # same exposures may be rerun under different group IDs, and they must - # be unique per detector because the same worker may be tasked with - # different detectors from the same group. Replace with a single - # universal run on DM-36586. - output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" + output_run = self._get_output_run(visit) self.butler.registry.registerCollection(output_run, CollectionType.RUN) + # As of Feb 2023, this moves output_run to the front of the chain if + # it's already present, but this behavior cannot be relied upon. _prepend_collection(self.butler, self.output_collection, [output_run]) - # TODO: remove after DM-36162 - self._clean_unsafe_datasets(self.butler, output_run) return output_run - def _prep_pipeline(self, visit: Visit) -> None: + def _get_pipeline_file(self, visit: Visit) -> str: + """Identify the pipeline to be run, based on the configured instrument + and details of the visit. + + Parameters + ---------- + visit : Visit + Group of snaps from one detector to prepare the pipeline for. + + Returns + ------- + pipeline : `str` + A path to a configured pipeline file. + """ + # TODO: We hacked the basepath in the Dockerfile so this works both in + # development and in service container, but it would be better if there + # were a path that's valid in both. + return os.path.join(getPackageDir("prompt_prototype"), + "pipelines", + visit.instrument, + "ApPipe.yaml") + + def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline: """Setup the pipeline to be run, based on the configured instrument and details of the incoming visit. @@ -572,11 +630,7 @@ def _prep_pipeline(self, visit: Visit) -> None: Raised if there is no AP pipeline file for this configuration. TODO: could be a good case for a custom exception here. """ - # TODO: We hacked the basepath in the Dockerfile so this works both in - # development and in service container, but it would be better if there - # were a path that's valid in both. - ap_pipeline_file = os.path.join(getPackageDir("prompt_prototype"), - "pipelines", self.instrument.getName(), "ApPipe.yaml") + ap_pipeline_file = self._get_pipeline_file(visit) try: pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: @@ -628,7 +682,7 @@ def ingest_image(self, visit: Visit, oid: str) -> None: if not file.isLocal: # TODO: RawIngestTask doesn't currently support remote files. file = self._download(file) - result = self.rawIngestTask.run([file], run=self._get_raw_run_name(visit)) + result = self.rawIngestTask.run([file]) # We only ingest one image at a time. # TODO: replace this assert with a custom exception, once we've decided # how we plan to handle exceptions in this code. @@ -659,16 +713,17 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") from e - output_run = self._prep_collections() + output_run = self._prep_collections(visit) where = f"instrument='{visit.instrument}' and detector={visit.detector} " \ f"and exposure in ({','.join(str(x) for x in exposure_ids)})" pipeline = self._prep_pipeline(visit) + output_run_butler = Butler(butler=self.butler, + collections=(self._get_init_output_run(visit), ) + self.butler.collections, + run=output_run) executor = SimplePipelineExecutor.from_pipeline(pipeline, where=where, - butler=Butler(butler=self.butler, - collections=self.butler.collections, - run=output_run)) + butler=output_run_butler) if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") @@ -683,9 +738,6 @@ def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: """Copy raws and pipeline outputs from processing a set of images back to the central Butler. - The copied raws can be found in the collection ``/raw/all``, - while the outputs can be found in ``"/prompt-results"``. - Parameters ---------- visit : Visit @@ -695,48 +747,19 @@ def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: """ # TODO: this method will not be responsible for raws after DM-36051. self._export_subset(visit, exposure_ids, "raw", - in_collections=self._get_raw_run_name(visit), - out_collection=self.instrument.makeDefaultRawIngestRunName()) - - umbrella = self.instrument.makeCollectionName("prompt-results") - latest_run = self.butler.registry.getCollectionChain(self.output_collection)[0] - if latest_run.startswith(self.output_collection + "/"): - self._export_subset(visit, exposure_ids, - # TODO: find a way to merge datasets like *_config - # or *_schema that are duplicated across multiple - # workers. - self._get_safe_dataset_types(self.butler), - in_collections=latest_run, - out_collection=umbrella) - _log.info(f"Pipeline products saved to collection '{umbrella}' for " - f"detector {visit.detector} of {exposure_ids}.") - else: - _log.warning(f"No output runs to save! Called for {visit.detector} of {exposure_ids}.") - - @staticmethod - def _clean_unsafe_datasets(butler, run: str): - """Remove datasets that potentially conflict with runs from other - exposures or detectors. - - Parameters - ---------- - butler : `lsst.daf.butler.Butler` - The butler from which to remove datasets. - run : `str` - The run from which to remove datasets. The method does **not** check - that this is a run and not, for example, a chained collection. - """ - all_types = set(butler.registry.queryDatasetTypes(...)) - unsafe_types = all_types - set(MiddlewareInterface._get_safe_dataset_types(butler)) - unsafe_datasets = list(butler.registry.queryDatasets(unsafe_types, collections=run)) - _log.debug("Removing %d unsafe datasets of types %s from '%s'.", - len(unsafe_datasets), - {t.name for t in unsafe_types}, - run) - butler.pruneDatasets(unsafe_datasets, - # Need purge=True to remove from runs. - purge=True, disassociate=True, unstore=True, - ) + in_collections=self.instrument.makeDefaultRawIngestRunName(), + ) + + latest_run = self._get_output_run(visit) + self._export_subset(visit, exposure_ids, + # TODO: find a way to merge datasets like *_config + # or *_schema that are duplicated across multiple + # workers. + self._get_safe_dataset_types(self.butler), + in_collections=latest_run, + ) + _log.info(f"Pipeline products saved to collection '{latest_run}' for " + f"detector {visit.detector} of {exposure_ids}.") @staticmethod def _get_safe_dataset_types(butler): @@ -756,7 +779,7 @@ def _get_safe_dataset_types(butler): if "detector" in dstype.dimensions] def _export_subset(self, visit: Visit, exposure_ids: set[int], - dataset_types: typing.Any, in_collections: typing.Any, out_collection: str) -> None: + dataset_types: typing.Any, in_collections: typing.Any) -> None: """Copy datasets associated with a processing run back to the central Butler. @@ -772,9 +795,6 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], in_collections The collections to transfer from; can be any expression described in :ref:`daf_butler_collection_expressions`. - out_collection : `str` - The chained collection in which to include the datasets. Need not - exist before the call. """ try: # Need to iterate over datasets at least twice, so list. @@ -811,12 +831,32 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int], skip_dimensions={"instrument", "detector", "skymap", "tract", "patch"}, transfer="copy") - # No-op if collection already exists. - self.central_butler.registry.registerCollection(out_collection, CollectionType.CHAINED) - runs = {ref.run for ref in datasets} - # Don't unlink any previous runs. - # TODO: need to secure this against concurrent modification - _prepend_collection(self.central_butler, out_collection, runs) + + def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None: + """Remove local repo content that is only needed for a single visit. + + This includes raws and pipeline outputs. + + Parameter + --------- + visit : Visit + The visit to be removed. + exposure_ids : `set` [`int`] + Identifiers of the exposures to be removed. + """ + raws = self.butler.registry.queryDatasets( + 'raw', + collections=self.instrument.makeDefaultRawIngestRunName(), + where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})", + instrument=visit.instrument, + detector=visit.detector, + ) + self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True) + # Outputs are all in one run, so just drop it. + output_run = self._get_output_run(visit) + for chain in self.butler.registry.getCollectionParentChains(output_run): + _remove_from_chain(self.butler, chain, [output_run]) + self.butler.removeRuns([output_run], unstore=True) def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, @@ -864,8 +904,8 @@ def _prepend_collection(butler: Butler, chain: str, new_collections: collections The butler in which the collections exist. chain : `str` The chained collection to prepend to. - new_collections : iterable [`str`] - The collections to prepend to ``chain``. + new_collections : sequence [`str`] + The collections to prepend to ``chain``, in order. Notes ----- @@ -873,3 +913,27 @@ def _prepend_collection(butler: Butler, chain: str, new_collections: collections """ old_chain = butler.registry.getCollectionChain(chain) # May be empty butler.registry.setCollectionChain(chain, list(new_collections) + list(old_chain), flatten=False) + + +def _remove_from_chain(butler: Butler, chain: str, old_collections: collections.abc.Iterable[str]) -> None: + """Remove a specific collection from a chain. + + This function has no effect if the collection is not in the chain. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The butler in which the collections exist. + chain : `str` + The chained collection to remove from. + old_collections : iterable [`str`] + The collections to remove from ``chain``. + + Notes + ----- + This function is not safe against concurrent modifications to ``chain``. + """ + contents = list(butler.registry.getCollectionChain(chain)) + for old in set(old_collections).intersection(contents): + contents.remove(old) + butler.registry.setCollectionChain(chain, contents, flatten=False) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 65466590..ee1e5e35 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -20,9 +20,9 @@ # along with this program. If not, see . import dataclasses +import datetime import itertools import tempfile -import time import os.path import unittest import unittest.mock @@ -42,7 +42,7 @@ from activator.visit import Visit from activator.middleware_interface import get_central_butler, MiddlewareInterface, \ - _query_missing_datasets, _prepend_collection + _query_missing_datasets, _prepend_collection, _remove_from_chain # The short name of the instrument used in the test repo. instname = "DECam" @@ -69,25 +69,29 @@ def fake_file_data(filename, dimensions, instrument, visit): data_id, file_data, : `DataCoordinate`, `RawFileData` The id and descriptor for the mock file. """ - data_id = DataCoordinate.standardize({"exposure": 1, + exposure_id = int(visit.groupId) + data_id = DataCoordinate.standardize({"exposure": exposure_id, "detector": visit.detector, "instrument": instrument.getName()}, universe=dimensions) - time = astropy.time.Time("2015-02-18T05:28:18.716517500", scale="tai") + start_time = astropy.time.Time("2015-02-18T05:28:18.716517500", scale="tai") obs_info = astro_metadata_translator.makeObservationInfo( instrument=instrument.getName(), - datetime_begin=time, - datetime_end=time + 30*u.second, - exposure_id=1, - visit_id=1, + datetime_begin=start_time, + datetime_end=start_time + 30*u.second, + exposure_id=exposure_id, + visit_id=exposure_id, boresight_rotation_angle=astropy.coordinates.Angle(visit.cameraAngle*u.degree), boresight_rotation_coord=visit.rotationSystem.name.lower(), tracking_radec=astropy.coordinates.SkyCoord(*visit.position, frame="icrs", unit="deg"), - observation_id="1", + observation_id=visit.groupId, physical_filter=filter, exposure_time=30.0*u.second, - observation_type="science") + observation_type="science", + group_counter_start=exposure_id, + group_counter_end=exposure_id, + ) dataset_info = RawFileDatasetInfo(data_id, obs_info) file_data = RawFileData([dataset_info], lsst.resources.ResourcePath(filename), @@ -105,6 +109,7 @@ def setUpClass(cls): {"IP_APDB": "localhost", "DB_APDB": "postgres", "USER_APDB": "postgres", + "K_REVISION": "prompt-proto-service-042", }) cls.env_patcher.start() @@ -368,6 +373,19 @@ def test_run_pipeline_empty_quantum_graph(self): with self.assertRaisesRegex(RuntimeError, "No data to process"): self.interface.run_pipeline(self.next_visit, {2}) + def test_get_output_run(self): + for date in [datetime.date.today(), datetime.datetime.today()]: + out_run = self.interface._get_output_run(self.next_visit, date) + self.assertEqual(out_run, + f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" + "/ApPipe/prompt-proto-service-042" + ) + init_run = self.interface._get_init_output_run(self.next_visit, date) + self.assertEqual(init_run, + f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" + "/ApPipe/prompt-proto-service-042" + ) + def _assert_in_collection(self, butler, collection, dataset_type, data_id): # Pass iff any dataset matches the query, no need to check them all. for dataset in butler.registry.queryDatasets(dataset_type, collections=collection, dataId=data_id): @@ -379,18 +397,12 @@ def _assert_not_in_collection(self, butler, collection, dataset_type, data_id): for dataset in butler.registry.queryDatasets(dataset_type, collections=collection, dataId=data_id): self.fail(f"{dataset} matches {dataset_type}@{data_id} in {collection}.") - def test_clean_unsafe_datasets(self): - """Test that _clean_unsafe_datasets removes only "conflicting" datasets. - - A conflicting dataset is one that can be produced with the same data ID - in different AP runs. This test currently uses ``calibrate_config`` and - ``src_schema`` as examples of conflicting datasets, and ``src`` and - ``calexp`` as counterexamples (which must not be removed). + def test_clean_local_repo(self): + """Test that clean_local_repo removes old datasets from the datastore. """ - butler = butler_tests.makeTestCollection(self.interface.butler, uniqueId=self.id()) - run = butler.run # Safe to define custom dataset types and IDs, because the repository # is regenerated for each test. + butler = self.interface.butler raw_data_id, _ = fake_file_data("foo.bar", butler.dimensions, self.interface.instrument, @@ -398,28 +410,31 @@ def test_clean_unsafe_datasets(self): processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in raw_data_id.items()} butler_tests.addDataIdValue(butler, "exposure", raw_data_id["exposure"]) butler_tests.addDataIdValue(butler, "visit", processed_data_id["visit"]) - butler_tests.addDatasetType(butler, "src_schema", set(), "SourceCatalog") - butler_tests.addDatasetType(butler, "calibrate_config", set(), "Config") - butler_tests.addDatasetType(butler, "src", {"instrument", "visit", "detector"}, "SourceCatalog") - butler_tests.addDatasetType(butler, "calexp", {"instrument", "visit", "detector"}, "ExposureF") + butler_tests.addDatasetType(butler, "raw", raw_data_id.keys(), "Exposure") + butler_tests.addDatasetType(butler, "src", processed_data_id.keys(), "SourceCatalog") + butler_tests.addDatasetType(butler, "calexp", processed_data_id.keys(), "ExposureF") - conf = lsst.pex.config.Config() exp = lsst.afw.image.ExposureF(20, 20) cat = lsst.afw.table.SourceCatalog() - butler.put(conf, "calibrate_config", processed_data_id, run=run) - butler.put(cat, "src_schema", processed_data_id, run=run) - butler.put(cat, "src", processed_data_id, run=run) - butler.put(exp, "calexp", processed_data_id, run=run) - self._assert_in_collection(butler, run, "calibrate_config", processed_data_id) - self._assert_in_collection(butler, run, "src_schema", processed_data_id) - self._assert_in_collection(butler, run, "src", processed_data_id) - self._assert_in_collection(butler, run, "calexp", processed_data_id) - - MiddlewareInterface._clean_unsafe_datasets(butler, run) - self._assert_not_in_collection(butler, run, "calibrate_config", processed_data_id) - self._assert_not_in_collection(butler, run, "src_schema", processed_data_id) - self._assert_in_collection(butler, run, "src", processed_data_id) - self._assert_in_collection(butler, run, "calexp", processed_data_id) + raw_collection = self.interface.instrument.makeDefaultRawIngestRunName() + butler.registry.registerCollection(raw_collection, CollectionType.RUN) + out_collection = self.interface._get_output_run(self.next_visit) + butler.registry.registerCollection(out_collection, CollectionType.RUN) + chain = "generic-chain" + butler.registry.registerCollection(chain, CollectionType.CHAINED) + butler.registry.setCollectionChain(chain, [out_collection, raw_collection]) + + butler.put(exp, "raw", raw_data_id, run=raw_collection) + butler.put(cat, "src", processed_data_id, run=out_collection) + butler.put(exp, "calexp", processed_data_id, run=out_collection) + self._assert_in_collection(butler, "*", "raw", raw_data_id) + self._assert_in_collection(butler, "*", "src", processed_data_id) + self._assert_in_collection(butler, "*", "calexp", processed_data_id) + + self.interface.clean_local_repo(self.next_visit, {raw_data_id["exposure"]}) + self._assert_not_in_collection(butler, "*", "raw", raw_data_id) + self._assert_not_in_collection(butler, "*", "src", processed_data_id) + self._assert_not_in_collection(butler, "*", "calexp", processed_data_id) def test_query_missing_datasets(self): """Test that query_missing_datasets provides the correct values. @@ -483,6 +498,23 @@ def test_prepend_collection(self): self.assertEqual(list(butler.registry.getCollectionChain("_prepend_base")), ["_prepend3", "_prepend1", "_prepend2"]) + def test_remove_from_chain(self): + butler = self.interface.butler + butler.registry.registerCollection("_remove1", CollectionType.TAGGED) + butler.registry.registerCollection("_remove2", CollectionType.TAGGED) + butler.registry.registerCollection("_remove33", CollectionType.TAGGED) + butler.registry.registerCollection("_remove_base", CollectionType.CHAINED) + + # Empty chain. + self.assertEqual(list(butler.registry.getCollectionChain("_remove_base")), []) + _remove_from_chain(butler, "_remove_base", ["_remove1"]) + self.assertEqual(list(butler.registry.getCollectionChain("_remove_base")), []) + + # Non-empty chain. + butler.registry.setCollectionChain("_remove_base", ["_remove1", "_remove2"]) + _remove_from_chain(butler, "_remove_base", ["_remove2", "_remove3"]) + self.assertEqual(list(butler.registry.getCollectionChain("_remove_base")), ["_remove1"]) + class MiddlewareInterfaceWriteableTest(unittest.TestCase): """Test the MiddlewareInterface class with faked data. @@ -499,6 +531,7 @@ def setUpClass(cls): {"IP_APDB": "localhost", "DB_APDB": "postgres", "USER_APDB": "postgres", + "K_REVISION": "prompt-proto-service-042", }) cls.env_patcher.start() @@ -555,7 +588,7 @@ def setUp(self): dec = -4.950050405424033 # DECam has no rotator; instrument angle is 90 degrees in our system. rot = 90. - self.next_visit = Visit(instrument=instrument, + self.next_visit = Visit(instrument=instname, detector=56, groupId="1", nimages=1, @@ -586,57 +619,66 @@ def setUp(self): self.interface.butler.dimensions, self.interface.instrument, self.next_visit) + self.second_data_id, second_file_data = fake_file_data(filepath, + self.interface.butler.dimensions, + self.interface.instrument, + self.second_visit) + with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data self.interface.ingest_image(self.next_visit, filename) + mock.return_value = second_file_data self.interface.ingest_image(self.second_visit, filename) - self.interface.define_visits.run([self.raw_data_id]) + self.interface.define_visits.run([self.raw_data_id, self.second_data_id]) self._simulate_run() def _simulate_run(self): """Create a mock pipeline execution that stores a calexp for self.raw_data_id. - - Returns - ------- - run : `str` - The output run containing the output. """ exp = lsst.afw.image.ExposureF(20, 20) - run = self.interface._prep_collections() + run1 = self.interface._prep_collections(self.next_visit) self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()} + run2 = self.interface._prep_collections(self.second_visit) + self.second_processed_data_id = {(k if k != "exposure" else "visit"): v + for k, v in self.second_data_id.items()} # Dataset types defined for local Butler on pipeline run, but no # guarantee this happens in central Butler. butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, "ExposureF") - self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run) - return run + self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run1) + self.interface.butler.put(exp, "calexp", self.second_processed_data_id, run=run2) - def _check_datasets(self, butler, types, collections, count, data_id): - datasets = list(butler.registry.queryDatasets(types, collections=collections)) - self.assertEqual(len(datasets), count) - for dataset in datasets: - self.assertEqual(dataset.dataId, data_id) + def _count_datasets(self, butler, types, collections): + return len(set(butler.registry.queryDatasets(types, collections=collections))) + + def _count_datasets_with_id(self, butler, types, collections, data_id): + return len(set(butler.registry.queryDatasets(types, collections=collections, dataId=data_id))) def test_export_outputs(self): self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) raw_collection = f"{instname}/raw/all" - export_collection = f"{instname}/prompt-results" - self._check_datasets(central_butler, - "raw", raw_collection, 1, self.raw_data_id) - # Did not export raws directly to raw/all. - self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED) - self._check_datasets(central_butler, - "calexp", export_collection, 1, self.processed_data_id) + date = datetime.datetime.now(datetime.timezone.utc) + export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ + "/ApPipe/prompt-proto-service-042" + self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 1) + self.assertEqual( + self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), + 1) + self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 1) + self.assertEqual( + self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), + 1) # Did not export calibs or other inputs. - self._check_datasets(central_butler, - ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection, - 0, {"error": "dnc"}) + self.assertEqual( + self._count_datasets(central_butler, ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection), + 0) # Nothing placed in "input" collections. - self._check_datasets(central_butler, - ["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"}) + self.assertEqual( + self._count_datasets(central_butler, ["raw", "calexp"], f"{instname}/defaults"), + 0) def test_export_outputs_bad_visit(self): bad_visit = dataclasses.replace(self.next_visit, detector=88) @@ -649,24 +691,33 @@ def test_export_outputs_bad_exposure(self): def test_export_outputs_retry(self): self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) - - time.sleep(1.0) # Force _simulate_run() to create a new timestamped run - self._simulate_run() - self.interface.export_outputs(self.second_visit, {self.raw_data_id["exposure"]}) + self.interface.export_outputs(self.second_visit, {self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) raw_collection = f"{instname}/raw/all" - export_collection = f"{instname}/prompt-results" - self._check_datasets(central_butler, - "raw", raw_collection, 2, self.raw_data_id) - # Did not export raws directly to raw/all. - self.assertEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.CHAINED) - self._check_datasets(central_butler, - "calexp", export_collection, 2, self.processed_data_id) + date = datetime.datetime.now(datetime.timezone.utc) + export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ + "/ApPipe/prompt-proto-service-042" + self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 2) + self.assertEqual( + self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), + 1) + self.assertEqual( + self._count_datasets_with_id(central_butler, "raw", raw_collection, self.second_data_id), + 1) + self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) + self.assertEqual( + self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), + 1) + self.assertEqual( + self._count_datasets_with_id(central_butler, "calexp", export_collection, + self.second_processed_data_id), + 1) # Did not export calibs or other inputs. - self._check_datasets(central_butler, - ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection, - 0, {"error": "dnc"}) + self.assertEqual( + self._count_datasets(central_butler, ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection), + 0) # Nothing placed in "input" collections. - self._check_datasets(central_butler, - ["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"}) + self.assertEqual( + self._count_datasets(central_butler, ["raw", "calexp"], f"{instname}/defaults"), + 0)