Skip to content

Commit

Permalink
Catch concurrency errors in output chain definition.
Browse files Browse the repository at this point in the history
The output chain strictly only needs to be handled by one pod, and is
not a core feature of the export. Failure to update it should not be
treated as a failure of the whole pipeline.
  • Loading branch information
kfindeisen committed Mar 27, 2024
1 parent 11940de commit 94c8ce5
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import typing

import astropy
import sqlalchemy

import lsst.utils.timer
from lsst.resources import ResourcePath
Expand Down Expand Up @@ -1273,8 +1274,16 @@ def _chain_exports(self, output_chain: str, output_runs: collections.abc.Iterabl
self.central_butler.registry.refresh()
self.central_butler.registry.registerCollection(output_chain, CollectionType.CHAINED)

with self.central_butler.transaction():
_prepend_collection(self.central_butler, output_chain, output_runs)
try:
with self.central_butler.transaction():
_prepend_collection(self.central_butler, output_chain, output_runs)
except sqlalchemy.exc.IntegrityError as e:
# HACK: I don't know of a better way to distinguish exceptions
# blended by SQLAlchemy. To be removed on DM-43316.
if 'duplicate key value violates unique constraint "collection_chain_pkey"' in str(e):
_log.error("Failed to update output chain, continuing export.")
else:
raise

def _query_datasets_by_storage_class(self, butler, exposure_ids, collections, storage_class):
"""Identify all datasets with a particular storage class, regardless of
Expand Down

0 comments on commit 94c8ce5

Please sign in to comment.