From 3bcbd74363db152bf3acd15c4677f906620b307f Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 27 Nov 2023 16:16:46 -0800 Subject: [PATCH 1/7] Do not raise if export_outputs doesn't match. Originally, export_outputs would only be called if the caller knew there were datasets to export, so not finding any files implied the data ID was wrong. Now that export_outputs may be called when the pipeline failed at the beginning, this gives false positives. The original message remains as a warning so that humans can check if it came up unexpectedly. --- python/activator/middleware_interface.py | 2 +- tests/test_middleware_interface.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index b204508c..39b92daa 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -857,7 +857,7 @@ def export_outputs(self, exposure_ids: set[int]) -> None: exported = True _log.info(f"Pipeline products saved to collection '{output_run}'.") if not exported: - raise ValueError(f"No datasets match visit={self.visit} and exposures={exposure_ids}.") + _log.warning("No datasets match visit=%s and exposures=%s.", self.visit, exposure_ids) @staticmethod def _get_safe_dataset_types(butler): diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 080f5115..9ad26f32 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -989,10 +989,6 @@ def test_export_outputs(self): self._count_datasets(central_butler, ["raw", "calexp"], f"{instname}/defaults"), 0) - def test_export_outputs_bad_exposure(self): - with self.assertRaises(ValueError): - self.interface.export_outputs({88}) - def test_export_outputs_retry(self): self.interface.export_outputs({self.raw_data_id["exposure"]}) self.second_interface.export_outputs({self.second_data_id["exposure"]}) From 1ecf8f3ca7c289b5366ba4defd3c5a096163cc81 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 16 Nov 2023 13:41:41 -0800 Subject: [PATCH 2/7] Add exceptions for retriable/non-retriable errors. These exceptions are intended as wrappers, to keep activator from needing to know details of what exceptions the Middleware/pipeline code can raise. --- python/activator/exception.py | 74 +++++++++++++++++++++++ tests/test_exception.py | 107 ++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 python/activator/exception.py create mode 100644 tests/test_exception.py diff --git a/python/activator/exception.py b/python/activator/exception.py new file mode 100644 index 00000000..df25c6d7 --- /dev/null +++ b/python/activator/exception.py @@ -0,0 +1,74 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +__all__ = ["NonRetriableError", "RetriableError"] + + +class NonRetriableError(Exception): + """A processing failure that must not be retried, regardless of the + underlying error. + + This class is intended as an adapter for another exception, and exposes + the ``nested`` field for this purpose. + """ + + @property + def nested(self): + """The exception nested inside this one (`BaseException`, read-only). + + This property is guaranteed non-raising, to make it easier to use + inside exception handlers. If there is no nested exception, it is equal + to `None`. + """ + if self.__cause__: + return self.__cause__ + elif self.__context__ and not self.__suppress_context__: + return self.__context__ + else: + return None + + +class RetriableError(Exception): + """A processing failure that can be safely retried. + + This class serves as an abstraction layer between the activator (which is + responsible for retry behavior) and the Middleware interface (which has the + information to determine whether retries are safe). + + This class is intended as an adapter for another exception, and exposes + the ``nested`` field for this purpose. + """ + + @property + def nested(self): + """The exception nested inside this one (`BaseException`, read-only). + + This property is guaranteed non-raising, to make it easier to use + inside exception handlers. If there is no nested exception, it is equal + to `None`. + """ + if self.__cause__: + return self.__cause__ + elif self.__context__ and not self.__suppress_context__: + return self.__context__ + else: + return None diff --git a/tests/test_exception.py b/tests/test_exception.py new file mode 100644 index 00000000..c98541db --- /dev/null +++ b/tests/test_exception.py @@ -0,0 +1,107 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import unittest + +from activator.exception import NonRetriableError, RetriableError + + +class NonRetriableErrorTest(unittest.TestCase): + def setUp(self): + super().setUp() + + self.inner = RuntimeError("Foo!") + + def test_raise_chained(self): + try: + raise NonRetriableError("Cannot compute!") from self.inner + except NonRetriableError as e: + self.assertIs(e.nested, self.inner) + + def test_raise_context(self): + try: + try: + raise self.inner + except Exception: + raise NonRetriableError("Cannot compute!") + except NonRetriableError as e: + self.assertIs(e.nested, self.inner) + + def test_raise_orphaned(self): + try: + raise NonRetriableError("Cannot compute!") + except NonRetriableError as e: + self.assertIs(e.nested, None) + + try: + raise NonRetriableError("Cannot compute!") from None + except NonRetriableError as e: + self.assertIs(e.nested, None) + + try: + try: + raise self.inner + except Exception: + raise NonRetriableError("Cannot compute!") from None + except NonRetriableError as e: + self.assertIs(e.nested, None) + + +class RetriableErrorTest(unittest.TestCase): + def setUp(self): + super().setUp() + + self.inner = RuntimeError("Foo!") + + def test_raise_chained(self): + try: + raise RetriableError("Cannot compute!") from self.inner + except RetriableError as e: + self.assertIs(e.nested, self.inner) + + def test_raise_context(self): + try: + try: + raise self.inner + except Exception: + raise RetriableError("Cannot compute!") + except RetriableError as e: + self.assertIs(e.nested, self.inner) + + def test_raise_orphaned(self): + try: + raise RetriableError("Cannot compute!") + except RetriableError as e: + self.assertIs(e.nested, None) + + try: + raise RetriableError("Cannot compute!") from None + except RetriableError as e: + self.assertIs(e.nested, None) + + try: + try: + raise self.inner + except Exception: + raise RetriableError("Cannot compute!") from None + except RetriableError as e: + self.assertIs(e.nested, None) From 6759c12d5a2155577bf8c5b03d5b2e527425ef20 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 27 Nov 2023 10:44:55 -0800 Subject: [PATCH 3/7] Add exception handlers to the pipeline execution block. These handlers determine whether the error is recoverable, and ensure that any partial outputs are correctly synced to the central repo. Exceptions raised outside of pipeline execution are still the responsibility of the generic Flask handler. --- python/activator/activator.py | 39 ++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 9d687c67..8a217b60 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -34,8 +34,10 @@ import cloudevents.http import confluent_kafka as kafka from flask import Flask, request +from werkzeug.exceptions import ServiceUnavailable from .config import PipelinesConfig +from .exception import NonRetriableError, RetriableError from .logger import setup_usdf_logger from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface from .raw import ( @@ -221,6 +223,25 @@ def _parse_bucket_notifications(payload): _log.error("Invalid S3 bucket notification: %s", e) +def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logger) -> bool: + """Attempt to export pipeline products, logging any failure. + + This method is designed to be safely run from within exception handlers. + + Returns + ------- + exported : `bool` + `True` if the export was successful, `False` for a (possibly partial) + failure. + """ + try: + mwi.export_outputs(exposures) + return True + except Exception: + log.exception("Central repo export failed. Some output products may be lost.") + return False + + @app.route("/next-visit", methods=["POST"]) def next_visit_handler() -> Tuple[str, int]: """A Flask view function for handling next-visit events. @@ -341,8 +362,24 @@ def next_visit_handler() -> Tuple[str, int]: try: mwi.run_pipeline(expid_set) # TODO: broadcast alerts here - # TODO: call export_outputs on success or permanent failure in DM-34141 mwi.export_outputs(expid_set) + except RetriableError as e: + error = e.nested if e.nested else e + _log.error("Processing failed: ", exc_info=error) + # Do not export, to leave room for the next attempt + # Service unavailable is not quite right, but no better standard response + raise ServiceUnavailable(f"A temporary error occurred during processing: {error}", + retry_after=10) from None + except NonRetriableError as e: + error = e.nested if e.nested else e + _log.error("Processing failed: ", exc_info=error) + _try_export(mwi, expid_set, _log) + return f"An error occurred during processing: {error}.\nThe system's state has " \ + "permanently changed, so this request should **NOT** be retried.", 500 + except Exception as e: + _log.error("Processing failed: ", exc_info=e) + _try_export(mwi, expid_set, _log) + return f"An error occurred during processing: {e}.", 500 finally: # TODO: run_pipeline requires a clean run until DM-38041. mwi.clean_local_repo(expid_set) From 0c0a835cff78e6b04935e48bbd96d744cafd27bd Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 29 Nov 2023 11:47:57 -0800 Subject: [PATCH 4/7] Raise NonRetriableError after APDB write. This ensures that failures that have made permanent changes will never be retried. --- python/activator/middleware_interface.py | 41 ++++++++++++++++++++- tests/test_middleware_interface.py | 47 ++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 39b92daa..e8b9dd59 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -37,12 +37,14 @@ import lsst.afw.cameraGeom from lsst.ctrl.mpexec import SeparablePipelineExecutor from lsst.daf.butler import Butler, CollectionType +import lsst.dax.apdb import lsst.geom from lsst.meas.algorithms.htmIndexer import HtmIndexer import lsst.obs.base import lsst.pipe.base from .config import PipelinesConfig +from .exception import NonRetriableError from .visit import FannedOutVisit _log = logging.getLogger("lsst." + __name__) @@ -715,6 +717,15 @@ def _prep_pipeline(self, pipeline_file) -> lsst.pipe.base.Pipeline: _log.debug("diaPipe is not in this pipeline.") return pipeline + # TODO: unify with _prep_pipeline after DM-41549 + def _make_apdb(self) -> lsst.dax.apdb.Apdb: + """Create an Apdb object for accessing this service's APDB. + """ + config = lsst.dax.apdb.ApdbSql.ConfigClass() + config.db_url = self._apdb_uri + config.namespace = self._apdb_namespace + return lsst.dax.apdb.ApdbSql(config) + def _download(self, remote): """Download an image located on a remote store. @@ -826,8 +837,34 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: # TODO: after DM-38041, move pre-execution to one-time repo setup. executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") - executor.run_pipeline(qgraph) - _log.info("Pipeline successfully run.") + try: + executor.run_pipeline(qgraph) + _log.info("Pipeline successfully run.") + except Exception as e: + state_changed = True # better safe than sorry + try: + data_ids = set(self.butler.registry.queryDataIds( + ["instrument", "visit", "detector"], where=where).expanded()) + if len(data_ids) == 1: + data_id = list(data_ids)[0] + packer = self.instrument.make_default_dimension_packer(data_id, is_exposure=False) + ccd_visit_id = packer.pack(data_id, returnMaxBits=False) + apdb = self._make_apdb() + # HACK: this method only works for ApdbSql; not needed after DM-41671 + if not apdb.containsCcdVisit(ccd_visit_id): + state_changed = False + else: + # Don't know how this could happen, so won't try to handle it gracefully. + _log.warning("Unexpected visit ids: %s. Assuming APDB modified.", data_ids) + except Exception: + # Failure in registry or APDB queries + _log.exception("Could not determine APDB state, assuming modified.") + raise NonRetriableError("APDB potentially modified") from e + else: + if state_changed: + raise NonRetriableError("APDB modified") from e + else: + raise break else: # TODO: a good place for a custom exception? diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 9ad26f32..2cd5c7af 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -30,6 +30,7 @@ import astropy.coordinates import astropy.units as u +import psycopg2 import astro_metadata_translator import lsst.pex.config @@ -42,6 +43,7 @@ import lsst.resources from activator.config import PipelinesConfig +from activator.exception import NonRetriableError from activator.visit import FannedOutVisit from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \ _filter_datasets, _prepend_collection, _remove_from_chain, _filter_calibs_by_date, _MissingDatasetError @@ -557,6 +559,51 @@ def test_run_pipeline_bad_visits(self): with self.assertRaisesRegex(RuntimeError, "No data to process"): self.interface.run_pipeline({2}) + def test_run_pipeline_early_exception(self): + """Test behavior when execution fails in single-frame processing. + """ + self._prepare_run_pipeline() + + with unittest.mock.patch( + "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \ + unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \ + as mock_run, \ + unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query: + mock_run.side_effect = RuntimeError("The pipeline doesn't like you.") + mock_query.return_value = False + with self.assertRaises(RuntimeError): + self.interface.run_pipeline({1}) + + def test_run_pipeline_late_exception(self): + """Test behavior when execution fails in diaPipe cleanup. + """ + self._prepare_run_pipeline() + + with unittest.mock.patch( + "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \ + unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \ + as mock_run, \ + unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query: + mock_run.side_effect = RuntimeError("The pipeline doesn't like you.") + mock_query.return_value = True + with self.assertRaises(NonRetriableError): + self.interface.run_pipeline({1}) + + def test_run_pipeline_cascading_exception(self): + """Test behavior when Butler and/or APDB access has failed completely. + """ + self._prepare_run_pipeline() + + with unittest.mock.patch( + "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \ + unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \ + as mock_run, \ + unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query: + mock_run.side_effect = RuntimeError("The pipeline doesn't like you.") + mock_query.side_effect = psycopg2.OperationalError("Database? What database?") + with self.assertRaises(NonRetriableError): + self.interface.run_pipeline({1}) + def test_get_output_run(self): filename = "ApPipe.yaml" for date in [datetime.date.today(), datetime.datetime.today()]: From 17684abc96cb8b0cd22b133a0501cea9dd94afdc Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 29 Nov 2023 11:52:59 -0800 Subject: [PATCH 5/7] Raise NonRetriableError after pipeline completion. This covers states that have modified any of the APDB, the alert stream, or the central repo. --- python/activator/activator.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 8a217b60..3e336c85 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -361,8 +361,12 @@ def next_visit_handler() -> Tuple[str, int]: _log.info("Running pipeline...") try: mwi.run_pipeline(expid_set) - # TODO: broadcast alerts here - mwi.export_outputs(expid_set) + try: + # TODO: broadcast alerts here + mwi.export_outputs(expid_set) + except Exception as e: + raise NonRetriableError("APDB and possibly alerts or central repo modified") \ + from e except RetriableError as e: error = e.nested if e.nested else e _log.error("Processing failed: ", exc_info=error) From 091d7fce9bb717655df7a53404f1719e58af7b1f Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 29 Nov 2023 12:03:59 -0800 Subject: [PATCH 6/7] Stop looping over _export_subset. Now that _export_subset uses Butler.transfer_from, it emits a log even if there's nothing to export. This leads to a confusing log stream. _export_subset can handle multiple input collections, so just handle them all at once. --- python/activator/middleware_interface.py | 26 +++++++++++------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index e8b9dd59..e3627fa1 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -879,21 +879,19 @@ def export_outputs(self, exposure_ids: set[int]) -> None: exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. """ - exported = False # Rather than determining which pipeline was run, just try to export all of them. - for pipeline_file in self._get_pipeline_files(): - output_run = self._get_output_run(pipeline_file, self._day_obs) - exports = self._export_subset(exposure_ids, - # TODO: find a way to merge datasets like *_config - # or *_schema that are duplicated across multiple - # workers. - self._get_safe_dataset_types(self.butler), - in_collections=output_run, - ) - if exports: - exported = True - _log.info(f"Pipeline products saved to collection '{output_run}'.") - if not exported: + output_runs = [self._get_output_run(f, self._day_obs) for f in self._get_pipeline_files()] + exports = self._export_subset(exposure_ids, + # TODO: find a way to merge datasets like *_config + # or *_schema that are duplicated across multiple + # workers. + self._get_safe_dataset_types(self.butler), + in_collections=output_runs, + ) + if exports: + populated_runs = {ref.run for ref in exports} + _log.info(f"Pipeline products saved to collections {populated_runs}.") + else: _log.warning("No datasets match visit=%s and exposures=%s.", self.visit, exposure_ids) @staticmethod From 571b22cb0303c3a449f15f4065ab1f42905d5ade Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 29 Nov 2023 13:55:18 -0800 Subject: [PATCH 7/7] Fix deprecation warnings from DataCoordinate. --- tests/test_middleware_interface.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 2cd5c7af..08f6ccc0 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -659,10 +659,10 @@ def test_clean_local_repo(self): butler.dimensions, self.interface.instrument, self.next_visit) - processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in raw_data_id.items()} + processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in raw_data_id.required.items()} butler_tests.addDataIdValue(butler, "exposure", raw_data_id["exposure"]) butler_tests.addDataIdValue(butler, "visit", processed_data_id["visit"]) - butler_tests.addDatasetType(butler, "raw", raw_data_id.keys(), "Exposure") + butler_tests.addDatasetType(butler, "raw", raw_data_id.required.keys(), "Exposure") butler_tests.addDatasetType(butler, "src", processed_data_id.keys(), "SourceCatalog") butler_tests.addDatasetType(butler, "calexp", processed_data_id.keys(), "ExposureF") @@ -973,9 +973,10 @@ def _simulate_run(self): """Create a mock pipeline execution that stores a calexp for self.raw_data_id. """ exp = lsst.afw.image.ExposureF(20, 20) - self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()} + self.processed_data_id = {(k if k != "exposure" else "visit"): v + for k, v in self.raw_data_id.required.items()} self.second_processed_data_id = {(k if k != "exposure" else "visit"): v - for k, v in self.second_data_id.items()} + for k, v in self.second_data_id.required.items()} # Dataset types defined for local Butler on pipeline run, but code # assumes output types already exist in central repo. butler_tests.addDatasetType(self.interface.central_butler, "calexp",