Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import cloudevents.http
import confluent_kafka as kafka
from flask import Flask, request
from werkzeug.exceptions import ServiceUnavailable

from .config import PipelinesConfig
from .exception import NonRetriableError, RetriableError
from .logger import setup_usdf_logger
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
from .raw import (
Expand Down Expand Up @@ -221,6 +223,25 @@ def _parse_bucket_notifications(payload):
_log.error("Invalid S3 bucket notification: %s", e)


def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logger) -> bool:
"""Attempt to export pipeline products, logging any failure.

This method is designed to be safely run from within exception handlers.

Returns
-------
exported : `bool`
`True` if the export was successful, `False` for a (possibly partial)
failure.
"""
try:
mwi.export_outputs(exposures)
return True
except Exception:
log.exception("Central repo export failed. Some output products may be lost.")
return False


@app.route("/next-visit", methods=["POST"])
def next_visit_handler() -> Tuple[str, int]:
"""A Flask view function for handling next-visit events.
Expand Down Expand Up @@ -340,9 +361,29 @@ def next_visit_handler() -> Tuple[str, int]:
_log.info("Running pipeline...")
try:
mwi.run_pipeline(expid_set)
# TODO: broadcast alerts here
# TODO: call export_outputs on success or permanent failure in DM-34141
mwi.export_outputs(expid_set)
try:
# TODO: broadcast alerts here
mwi.export_outputs(expid_set)
except Exception as e:
raise NonRetriableError("APDB and possibly alerts or central repo modified") \
from e
except RetriableError as e:
error = e.nested if e.nested else e
_log.error("Processing failed: ", exc_info=error)
# Do not export, to leave room for the next attempt
# Service unavailable is not quite right, but no better standard response
raise ServiceUnavailable(f"A temporary error occurred during processing: {error}",
retry_after=10) from None
except NonRetriableError as e:
error = e.nested if e.nested else e
_log.error("Processing failed: ", exc_info=error)
_try_export(mwi, expid_set, _log)
return f"An error occurred during processing: {error}.\nThe system's state has " \
"permanently changed, so this request should **NOT** be retried.", 500
except Exception as e:
_log.error("Processing failed: ", exc_info=e)
_try_export(mwi, expid_set, _log)
return f"An error occurred during processing: {e}.", 500
finally:
# TODO: run_pipeline requires a clean run until DM-38041.
mwi.clean_local_repo(expid_set)
Expand Down
74 changes: 74 additions & 0 deletions python/activator/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["NonRetriableError", "RetriableError"]


class NonRetriableError(Exception):
"""A processing failure that must not be retried, regardless of the
underlying error.

This class is intended as an adapter for another exception, and exposes
the ``nested`` field for this purpose.
"""

@property
def nested(self):
"""The exception nested inside this one (`BaseException`, read-only).

This property is guaranteed non-raising, to make it easier to use
inside exception handlers. If there is no nested exception, it is equal
to `None`.
"""
if self.__cause__:
return self.__cause__
elif self.__context__ and not self.__suppress_context__:
return self.__context__
else:
return None


class RetriableError(Exception):
"""A processing failure that can be safely retried.

This class serves as an abstraction layer between the activator (which is
responsible for retry behavior) and the Middleware interface (which has the
information to determine whether retries are safe).

This class is intended as an adapter for another exception, and exposes
the ``nested`` field for this purpose.
"""

@property
def nested(self):
"""The exception nested inside this one (`BaseException`, read-only).

This property is guaranteed non-raising, to make it easier to use
inside exception handlers. If there is no nested exception, it is equal
to `None`.
"""
if self.__cause__:
return self.__cause__
elif self.__context__ and not self.__suppress_context__:
return self.__context__
else:
return None
69 changes: 52 additions & 17 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
from lsst.daf.butler import Butler, CollectionType
import lsst.dax.apdb
import lsst.geom
from lsst.meas.algorithms.htmIndexer import HtmIndexer
import lsst.obs.base
import lsst.pipe.base

from .config import PipelinesConfig
from .exception import NonRetriableError
from .visit import FannedOutVisit

_log = logging.getLogger("lsst." + __name__)
Expand Down Expand Up @@ -715,6 +717,15 @@ def _prep_pipeline(self, pipeline_file) -> lsst.pipe.base.Pipeline:
_log.debug("diaPipe is not in this pipeline.")
return pipeline

# TODO: unify with _prep_pipeline after DM-41549
def _make_apdb(self) -> lsst.dax.apdb.Apdb:
"""Create an Apdb object for accessing this service's APDB.
"""
config = lsst.dax.apdb.ApdbSql.ConfigClass()
config.db_url = self._apdb_uri
config.namespace = self._apdb_namespace
return lsst.dax.apdb.ApdbSql(config)

def _download(self, remote):
"""Download an image located on a remote store.

Expand Down Expand Up @@ -826,8 +837,34 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
# TODO: after DM-38041, move pre-execution to one-time repo setup.
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
executor.run_pipeline(qgraph)
_log.info("Pipeline successfully run.")
try:
executor.run_pipeline(qgraph)
_log.info("Pipeline successfully run.")
except Exception as e:
state_changed = True # better safe than sorry
try:
data_ids = set(self.butler.registry.queryDataIds(
["instrument", "visit", "detector"], where=where).expanded())
if len(data_ids) == 1:
data_id = list(data_ids)[0]
packer = self.instrument.make_default_dimension_packer(data_id, is_exposure=False)
ccd_visit_id = packer.pack(data_id, returnMaxBits=False)
apdb = self._make_apdb()
# HACK: this method only works for ApdbSql; not needed after DM-41671
if not apdb.containsCcdVisit(ccd_visit_id):
state_changed = False
else:
# Don't know how this could happen, so won't try to handle it gracefully.
_log.warning("Unexpected visit ids: %s. Assuming APDB modified.", data_ids)
except Exception:
# Failure in registry or APDB queries
_log.exception("Could not determine APDB state, assuming modified.")
raise NonRetriableError("APDB potentially modified") from e
else:
if state_changed:
raise NonRetriableError("APDB modified") from e
else:
raise
break
else:
# TODO: a good place for a custom exception?
Expand All @@ -842,22 +879,20 @@ def export_outputs(self, exposure_ids: set[int]) -> None:
exposure_ids : `set` [`int`]
Identifiers of the exposures that were processed.
"""
exported = False
# Rather than determining which pipeline was run, just try to export all of them.
for pipeline_file in self._get_pipeline_files():
output_run = self._get_output_run(pipeline_file, self._day_obs)
exports = self._export_subset(exposure_ids,
# TODO: find a way to merge datasets like *_config
# or *_schema that are duplicated across multiple
# workers.
self._get_safe_dataset_types(self.butler),
in_collections=output_run,
)
if exports:
exported = True
_log.info(f"Pipeline products saved to collection '{output_run}'.")
if not exported:
raise ValueError(f"No datasets match visit={self.visit} and exposures={exposure_ids}.")
output_runs = [self._get_output_run(f, self._day_obs) for f in self._get_pipeline_files()]
exports = self._export_subset(exposure_ids,
# TODO: find a way to merge datasets like *_config
# or *_schema that are duplicated across multiple
# workers.
self._get_safe_dataset_types(self.butler),
in_collections=output_runs,
)
if exports:
populated_runs = {ref.run for ref in exports}
_log.info(f"Pipeline products saved to collections {populated_runs}.")
else:
_log.warning("No datasets match visit=%s and exposures=%s.", self.visit, exposure_ids)

@staticmethod
def _get_safe_dataset_types(butler):
Expand Down
107 changes: 107 additions & 0 deletions tests/test_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


import unittest

from activator.exception import NonRetriableError, RetriableError


class NonRetriableErrorTest(unittest.TestCase):
def setUp(self):
super().setUp()

self.inner = RuntimeError("Foo!")

def test_raise_chained(self):
try:
raise NonRetriableError("Cannot compute!") from self.inner
except NonRetriableError as e:
self.assertIs(e.nested, self.inner)

def test_raise_context(self):
try:
try:
raise self.inner
except Exception:
raise NonRetriableError("Cannot compute!")
except NonRetriableError as e:
self.assertIs(e.nested, self.inner)

def test_raise_orphaned(self):
try:
raise NonRetriableError("Cannot compute!")
except NonRetriableError as e:
self.assertIs(e.nested, None)

try:
raise NonRetriableError("Cannot compute!") from None
except NonRetriableError as e:
self.assertIs(e.nested, None)

try:
try:
raise self.inner
except Exception:
raise NonRetriableError("Cannot compute!") from None
except NonRetriableError as e:
self.assertIs(e.nested, None)


class RetriableErrorTest(unittest.TestCase):
def setUp(self):
super().setUp()

self.inner = RuntimeError("Foo!")

def test_raise_chained(self):
try:
raise RetriableError("Cannot compute!") from self.inner
except RetriableError as e:
self.assertIs(e.nested, self.inner)

def test_raise_context(self):
try:
try:
raise self.inner
except Exception:
raise RetriableError("Cannot compute!")
except RetriableError as e:
self.assertIs(e.nested, self.inner)

def test_raise_orphaned(self):
try:
raise RetriableError("Cannot compute!")
except RetriableError as e:
self.assertIs(e.nested, None)

try:
raise RetriableError("Cannot compute!") from None
except RetriableError as e:
self.assertIs(e.nested, None)

try:
try:
raise self.inner
except Exception:
raise RetriableError("Cannot compute!") from None
except RetriableError as e:
self.assertIs(e.nested, None)
Loading