Skip to content

Commit

Permalink
Use callbacks to report success/failure in ingest.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Mar 5, 2021
1 parent 5f9221c commit 07b3e45
Showing 1 changed file with 46 additions and 5 deletions.
51 changes: 46 additions & 5 deletions python/lsst/obs/base/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import os.path
from dataclasses import dataclass, InitVar
from typing import List, Iterator, Iterable, Type, Optional, Any
from typing import Callable, List, Iterator, Iterable, Tuple, Type, Optional, Any
from collections import defaultdict
from multiprocessing import Pool

Expand All @@ -48,6 +48,13 @@
from ._fitsRawFormatterBase import FitsRawFormatterBase


def _do_nothing(*args, **kwargs) -> None:
"""A function that accepts anything and does nothing, for use as a default
in callback arguments.
"""
pass


@dataclass
class RawFileDatasetInfo:
"""Structure that holds information about a single dataset within a
Expand Down Expand Up @@ -175,6 +182,26 @@ class RawIngestTask(Task):
Writeable butler instance, with ``butler.run`` set to the appropriate
`~lsst.daf.butler.CollectionType.RUN` collection for these raw
datasets.
on_success : `Callable`, optional
A callback invoked when all of the raws associated with an exposure
are ingested. Will be passed a list of `FileDataset` objects, each
containing one or more resolved `DatasetRef` objects. If this callback
raises it will interrupt the entire ingest process, even if
`RawIngestConfig.failFast` is `False`.
on_metadata_failure : `Callable`, optional
A callback invoked when a failure occurs trying to translate the
metadata for a file. Will be passed the filename and the exception, in
that order, as positional arguments. Guaranteed to be called in an
``except`` block, allowing the callback to re-raise or replace (with
``raise ... from``) to override the task's usual error handling (before
`RawIngestConfig.failFast` logic occurs).
on_ingest_failure : `Callable`, optional
A callback invoked when dimension record or dataset insertion into the
database fails for an exposure. Will be passed a `RawExposureData`
instance and the exception, in that order, as positional arguments.
Guaranteed to be called in an ``except`` block, allowing the callback
to re-raise or replace (with ``raise ... from``) to override the task's
usual error handling (before `RawIngestConfig.failFast` logic occurs).
**kwargs
Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
constructor.
Expand All @@ -195,20 +222,28 @@ def getDatasetType(self):
return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
universe=self.butler.registry.dimensions)

def __init__(self, config: Optional[RawIngestConfig] = None, *, butler: Butler, **kwargs: Any):
def __init__(self, config: Optional[RawIngestConfig] = None, *, butler: Butler,
on_success: Callable[[List[FileDataset]], Any] = _do_nothing,
on_metadata_failure: Callable[[str, Exception], Any] = _do_nothing,
on_ingest_failure: Callable[[RawExposureData, Exception], Any] = _do_nothing,
**kwargs: Any):
config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
super().__init__(config, **kwargs)
self.butler = butler
self.universe = self.butler.registry.dimensions
self.datasetType = self.getDatasetType()
self._on_success = on_success
self._on_metadata_failure = on_metadata_failure
self._on_ingest_failure = on_ingest_failure

# Import all the instrument classes so that we ensure that we
# have all the relevant metadata translators loaded.
Instrument.importAll(self.butler.registry)

def _reduce_kwargs(self):
# Add extra parameters to pickle
return dict(**super()._reduce_kwargs(), butler=self.butler)
return dict(**super()._reduce_kwargs(), butler=self.butler, on_success=self._on_success,
on_metadata_failure=self._on_metadata_failure, on_ingest_failure=self._on_ingest_failure)

def extractMetadata(self, filename: str) -> RawFileData:
"""Extract and process metadata from a single raw file.
Expand Down Expand Up @@ -249,6 +284,7 @@ def extractMetadata(self, filename: str) -> RawFileData:
datasets = []
FormatterClass = Formatter
instrument = None
self._on_metadata_failure(filename, e)
if self.config.failFast:
raise RuntimeError(f"Problem extracting metadata from file {filename}") from e
else:
Expand All @@ -259,6 +295,7 @@ def extractMetadata(self, filename: str) -> RawFileData:
try:
instrument = Instrument.fromName(datasets[0].dataId["instrument"], self.butler.registry)
except LookupError as e:
self._on_metadata_failure(filename, e)
self.log.warning("Instrument %s for file %s not known to registry",
datasets[0].dataId["instrument"], filename)
if self.config.failFast:
Expand Down Expand Up @@ -554,6 +591,7 @@ def run(self, files, *, pool: Optional[Pool] = None, processes: int = 1, run: Op
try:
self.butler.registry.syncDimensionData("exposure", exposure.record)
except Exception as e:
self._on_ingest_failure(exposure, e)
n_exposures_failed += 1
self.log.warning("Exposure %s:%s could not be registered: %s",
exposure.record.instrument, exposure.record.obs_id, e)
Expand All @@ -573,16 +611,19 @@ def run(self, files, *, pool: Optional[Pool] = None, processes: int = 1, run: Op
try:
with self.butler.transaction():
datasets_for_exposure = self.ingestExposureDatasets(exposure, run=this_run)
for dataset in datasets_for_exposure:
refs.extend(dataset.refs)
except Exception as e:
self._on_ingest_failure(exposure, e)
n_ingests_failed += 1
self.log.warning("Failed to ingest the following for reason: %s", e)
for f in exposure.files:
self.log.warning("- %s", f.filename)
if self.config.failFast:
raise e
continue
else:
self._on_success(datasets_for_exposure)
for dataset in datasets_for_exposure:
refs.extend(dataset.refs)

# Success for this exposure
n_exposures += 1
Expand Down

0 comments on commit 07b3e45

Please sign in to comment.