diff --git a/python/activator/activator.py b/python/activator/activator.py
index 9d687c67..3e336c85 100644
--- a/python/activator/activator.py
+++ b/python/activator/activator.py
@@ -34,8 +34,10 @@
import cloudevents.http
import confluent_kafka as kafka
from flask import Flask, request
+from werkzeug.exceptions import ServiceUnavailable
from .config import PipelinesConfig
+from .exception import NonRetriableError, RetriableError
from .logger import setup_usdf_logger
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
from .raw import (
@@ -221,6 +223,25 @@ def _parse_bucket_notifications(payload):
_log.error("Invalid S3 bucket notification: %s", e)
+def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logger) -> bool:
+ """Attempt to export pipeline products, logging any failure.
+
+ This method is designed to be safely run from within exception handlers.
+
+ Returns
+ -------
+ exported : `bool`
+ `True` if the export was successful, `False` for a (possibly partial)
+ failure.
+ """
+ try:
+ mwi.export_outputs(exposures)
+ return True
+ except Exception:
+ log.exception("Central repo export failed. Some output products may be lost.")
+ return False
+
+
@app.route("/next-visit", methods=["POST"])
def next_visit_handler() -> Tuple[str, int]:
"""A Flask view function for handling next-visit events.
@@ -340,9 +361,29 @@ def next_visit_handler() -> Tuple[str, int]:
_log.info("Running pipeline...")
try:
mwi.run_pipeline(expid_set)
- # TODO: broadcast alerts here
- # TODO: call export_outputs on success or permanent failure in DM-34141
- mwi.export_outputs(expid_set)
+ try:
+ # TODO: broadcast alerts here
+ mwi.export_outputs(expid_set)
+ except Exception as e:
+ raise NonRetriableError("APDB and possibly alerts or central repo modified") \
+ from e
+ except RetriableError as e:
+ error = e.nested if e.nested else e
+ _log.error("Processing failed: ", exc_info=error)
+ # Do not export, to leave room for the next attempt
+ # Service unavailable is not quite right, but no better standard response
+ raise ServiceUnavailable(f"A temporary error occurred during processing: {error}",
+ retry_after=10) from None
+ except NonRetriableError as e:
+ error = e.nested if e.nested else e
+ _log.error("Processing failed: ", exc_info=error)
+ _try_export(mwi, expid_set, _log)
+ return f"An error occurred during processing: {error}.\nThe system's state has " \
+ "permanently changed, so this request should **NOT** be retried.", 500
+ except Exception as e:
+ _log.error("Processing failed: ", exc_info=e)
+ _try_export(mwi, expid_set, _log)
+ return f"An error occurred during processing: {e}.", 500
finally:
# TODO: run_pipeline requires a clean run until DM-38041.
mwi.clean_local_repo(expid_set)
diff --git a/python/activator/exception.py b/python/activator/exception.py
new file mode 100644
index 00000000..df25c6d7
--- /dev/null
+++ b/python/activator/exception.py
@@ -0,0 +1,74 @@
+# This file is part of prompt_processing.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+
+__all__ = ["NonRetriableError", "RetriableError"]
+
+
+class NonRetriableError(Exception):
+ """A processing failure that must not be retried, regardless of the
+ underlying error.
+
+ This class is intended as an adapter for another exception, and exposes
+ the ``nested`` field for this purpose.
+ """
+
+ @property
+ def nested(self):
+ """The exception nested inside this one (`BaseException`, read-only).
+
+ This property is guaranteed non-raising, to make it easier to use
+ inside exception handlers. If there is no nested exception, it is equal
+ to `None`.
+ """
+ if self.__cause__:
+ return self.__cause__
+ elif self.__context__ and not self.__suppress_context__:
+ return self.__context__
+ else:
+ return None
+
+
+class RetriableError(Exception):
+ """A processing failure that can be safely retried.
+
+ This class serves as an abstraction layer between the activator (which is
+ responsible for retry behavior) and the Middleware interface (which has the
+ information to determine whether retries are safe).
+
+ This class is intended as an adapter for another exception, and exposes
+ the ``nested`` field for this purpose.
+ """
+
+ @property
+ def nested(self):
+ """The exception nested inside this one (`BaseException`, read-only).
+
+ This property is guaranteed non-raising, to make it easier to use
+ inside exception handlers. If there is no nested exception, it is equal
+ to `None`.
+ """
+ if self.__cause__:
+ return self.__cause__
+ elif self.__context__ and not self.__suppress_context__:
+ return self.__context__
+ else:
+ return None
diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py
index b204508c..e3627fa1 100644
--- a/python/activator/middleware_interface.py
+++ b/python/activator/middleware_interface.py
@@ -37,12 +37,14 @@
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
from lsst.daf.butler import Butler, CollectionType
+import lsst.dax.apdb
import lsst.geom
from lsst.meas.algorithms.htmIndexer import HtmIndexer
import lsst.obs.base
import lsst.pipe.base
from .config import PipelinesConfig
+from .exception import NonRetriableError
from .visit import FannedOutVisit
_log = logging.getLogger("lsst." + __name__)
@@ -715,6 +717,15 @@ def _prep_pipeline(self, pipeline_file) -> lsst.pipe.base.Pipeline:
_log.debug("diaPipe is not in this pipeline.")
return pipeline
+ # TODO: unify with _prep_pipeline after DM-41549
+ def _make_apdb(self) -> lsst.dax.apdb.Apdb:
+ """Create an Apdb object for accessing this service's APDB.
+ """
+ config = lsst.dax.apdb.ApdbSql.ConfigClass()
+ config.db_url = self._apdb_uri
+ config.namespace = self._apdb_namespace
+ return lsst.dax.apdb.ApdbSql(config)
+
def _download(self, remote):
"""Download an image located on a remote store.
@@ -826,8 +837,34 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
# TODO: after DM-38041, move pre-execution to one-time repo setup.
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
- executor.run_pipeline(qgraph)
- _log.info("Pipeline successfully run.")
+ try:
+ executor.run_pipeline(qgraph)
+ _log.info("Pipeline successfully run.")
+ except Exception as e:
+ state_changed = True # better safe than sorry
+ try:
+ data_ids = set(self.butler.registry.queryDataIds(
+ ["instrument", "visit", "detector"], where=where).expanded())
+ if len(data_ids) == 1:
+ data_id = list(data_ids)[0]
+ packer = self.instrument.make_default_dimension_packer(data_id, is_exposure=False)
+ ccd_visit_id = packer.pack(data_id, returnMaxBits=False)
+ apdb = self._make_apdb()
+ # HACK: this method only works for ApdbSql; not needed after DM-41671
+ if not apdb.containsCcdVisit(ccd_visit_id):
+ state_changed = False
+ else:
+ # Don't know how this could happen, so won't try to handle it gracefully.
+ _log.warning("Unexpected visit ids: %s. Assuming APDB modified.", data_ids)
+ except Exception:
+ # Failure in registry or APDB queries
+ _log.exception("Could not determine APDB state, assuming modified.")
+ raise NonRetriableError("APDB potentially modified") from e
+ else:
+ if state_changed:
+ raise NonRetriableError("APDB modified") from e
+ else:
+ raise
break
else:
# TODO: a good place for a custom exception?
@@ -842,22 +879,20 @@ def export_outputs(self, exposure_ids: set[int]) -> None:
exposure_ids : `set` [`int`]
Identifiers of the exposures that were processed.
"""
- exported = False
# Rather than determining which pipeline was run, just try to export all of them.
- for pipeline_file in self._get_pipeline_files():
- output_run = self._get_output_run(pipeline_file, self._day_obs)
- exports = self._export_subset(exposure_ids,
- # TODO: find a way to merge datasets like *_config
- # or *_schema that are duplicated across multiple
- # workers.
- self._get_safe_dataset_types(self.butler),
- in_collections=output_run,
- )
- if exports:
- exported = True
- _log.info(f"Pipeline products saved to collection '{output_run}'.")
- if not exported:
- raise ValueError(f"No datasets match visit={self.visit} and exposures={exposure_ids}.")
+ output_runs = [self._get_output_run(f, self._day_obs) for f in self._get_pipeline_files()]
+ exports = self._export_subset(exposure_ids,
+ # TODO: find a way to merge datasets like *_config
+ # or *_schema that are duplicated across multiple
+ # workers.
+ self._get_safe_dataset_types(self.butler),
+ in_collections=output_runs,
+ )
+ if exports:
+ populated_runs = {ref.run for ref in exports}
+ _log.info(f"Pipeline products saved to collections {populated_runs}.")
+ else:
+ _log.warning("No datasets match visit=%s and exposures=%s.", self.visit, exposure_ids)
@staticmethod
def _get_safe_dataset_types(butler):
diff --git a/tests/test_exception.py b/tests/test_exception.py
new file mode 100644
index 00000000..c98541db
--- /dev/null
+++ b/tests/test_exception.py
@@ -0,0 +1,107 @@
+# This file is part of prompt_processing.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+
+import unittest
+
+from activator.exception import NonRetriableError, RetriableError
+
+
+class NonRetriableErrorTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.inner = RuntimeError("Foo!")
+
+ def test_raise_chained(self):
+ try:
+ raise NonRetriableError("Cannot compute!") from self.inner
+ except NonRetriableError as e:
+ self.assertIs(e.nested, self.inner)
+
+ def test_raise_context(self):
+ try:
+ try:
+ raise self.inner
+ except Exception:
+ raise NonRetriableError("Cannot compute!")
+ except NonRetriableError as e:
+ self.assertIs(e.nested, self.inner)
+
+ def test_raise_orphaned(self):
+ try:
+ raise NonRetriableError("Cannot compute!")
+ except NonRetriableError as e:
+ self.assertIs(e.nested, None)
+
+ try:
+ raise NonRetriableError("Cannot compute!") from None
+ except NonRetriableError as e:
+ self.assertIs(e.nested, None)
+
+ try:
+ try:
+ raise self.inner
+ except Exception:
+ raise NonRetriableError("Cannot compute!") from None
+ except NonRetriableError as e:
+ self.assertIs(e.nested, None)
+
+
+class RetriableErrorTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.inner = RuntimeError("Foo!")
+
+ def test_raise_chained(self):
+ try:
+ raise RetriableError("Cannot compute!") from self.inner
+ except RetriableError as e:
+ self.assertIs(e.nested, self.inner)
+
+ def test_raise_context(self):
+ try:
+ try:
+ raise self.inner
+ except Exception:
+ raise RetriableError("Cannot compute!")
+ except RetriableError as e:
+ self.assertIs(e.nested, self.inner)
+
+ def test_raise_orphaned(self):
+ try:
+ raise RetriableError("Cannot compute!")
+ except RetriableError as e:
+ self.assertIs(e.nested, None)
+
+ try:
+ raise RetriableError("Cannot compute!") from None
+ except RetriableError as e:
+ self.assertIs(e.nested, None)
+
+ try:
+ try:
+ raise self.inner
+ except Exception:
+ raise RetriableError("Cannot compute!") from None
+ except RetriableError as e:
+ self.assertIs(e.nested, None)
diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py
index 080f5115..08f6ccc0 100644
--- a/tests/test_middleware_interface.py
+++ b/tests/test_middleware_interface.py
@@ -30,6 +30,7 @@
import astropy.coordinates
import astropy.units as u
+import psycopg2
import astro_metadata_translator
import lsst.pex.config
@@ -42,6 +43,7 @@
import lsst.resources
from activator.config import PipelinesConfig
+from activator.exception import NonRetriableError
from activator.visit import FannedOutVisit
from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \
_filter_datasets, _prepend_collection, _remove_from_chain, _filter_calibs_by_date, _MissingDatasetError
@@ -557,6 +559,51 @@ def test_run_pipeline_bad_visits(self):
with self.assertRaisesRegex(RuntimeError, "No data to process"):
self.interface.run_pipeline({2})
+ def test_run_pipeline_early_exception(self):
+ """Test behavior when execution fails in single-frame processing.
+ """
+ self._prepare_run_pipeline()
+
+ with unittest.mock.patch(
+ "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \
+ unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
+ as mock_run, \
+ unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query:
+ mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
+ mock_query.return_value = False
+ with self.assertRaises(RuntimeError):
+ self.interface.run_pipeline({1})
+
+ def test_run_pipeline_late_exception(self):
+ """Test behavior when execution fails in diaPipe cleanup.
+ """
+ self._prepare_run_pipeline()
+
+ with unittest.mock.patch(
+ "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \
+ unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
+ as mock_run, \
+ unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query:
+ mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
+ mock_query.return_value = True
+ with self.assertRaises(NonRetriableError):
+ self.interface.run_pipeline({1})
+
+ def test_run_pipeline_cascading_exception(self):
+ """Test behavior when Butler and/or APDB access has failed completely.
+ """
+ self._prepare_run_pipeline()
+
+ with unittest.mock.patch(
+ "activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \
+ unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
+ as mock_run, \
+ unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsCcdVisit") as mock_query:
+ mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
+ mock_query.side_effect = psycopg2.OperationalError("Database? What database?")
+ with self.assertRaises(NonRetriableError):
+ self.interface.run_pipeline({1})
+
def test_get_output_run(self):
filename = "ApPipe.yaml"
for date in [datetime.date.today(), datetime.datetime.today()]:
@@ -612,10 +659,10 @@ def test_clean_local_repo(self):
butler.dimensions,
self.interface.instrument,
self.next_visit)
- processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in raw_data_id.items()}
+ processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in raw_data_id.required.items()}
butler_tests.addDataIdValue(butler, "exposure", raw_data_id["exposure"])
butler_tests.addDataIdValue(butler, "visit", processed_data_id["visit"])
- butler_tests.addDatasetType(butler, "raw", raw_data_id.keys(), "Exposure")
+ butler_tests.addDatasetType(butler, "raw", raw_data_id.required.keys(), "Exposure")
butler_tests.addDatasetType(butler, "src", processed_data_id.keys(), "SourceCatalog")
butler_tests.addDatasetType(butler, "calexp", processed_data_id.keys(), "ExposureF")
@@ -926,9 +973,10 @@ def _simulate_run(self):
"""Create a mock pipeline execution that stores a calexp for self.raw_data_id.
"""
exp = lsst.afw.image.ExposureF(20, 20)
- self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()}
+ self.processed_data_id = {(k if k != "exposure" else "visit"): v
+ for k, v in self.raw_data_id.required.items()}
self.second_processed_data_id = {(k if k != "exposure" else "visit"): v
- for k, v in self.second_data_id.items()}
+ for k, v in self.second_data_id.required.items()}
# Dataset types defined for local Butler on pipeline run, but code
# assumes output types already exist in central repo.
butler_tests.addDatasetType(self.interface.central_butler, "calexp",
@@ -989,10 +1037,6 @@ def test_export_outputs(self):
self._count_datasets(central_butler, ["raw", "calexp"], f"{instname}/defaults"),
0)
- def test_export_outputs_bad_exposure(self):
- with self.assertRaises(ValueError):
- self.interface.export_outputs({88})
-
def test_export_outputs_retry(self):
self.interface.export_outputs({self.raw_data_id["exposure"]})
self.second_interface.export_outputs({self.second_data_id["exposure"]})