Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/changes/DM-53019.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Improve provenance tracking for failed quanta and retries.

By storing extra information in the log datasets written during extra
information, we can record caught exceptions, track which other quanta have
already executed in the same process, and keep track of previous attempts to
run the same quantum.
167 changes: 156 additions & 11 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,37 @@

from __future__ import annotations

__all__ = (
"AlgorithmError",
"AnnotatedPartialOutputsError",
"ExceptionInfo",
"InvalidQuantumError",
"NoWorkFound",
"QuantumAttemptStatus",
"QuantumSuccessCaveats",
"RepeatableQuantumError",
"UnprocessableDataError",
"UpstreamFailureNoWorkFound",
)

import abc
import enum
import logging
import sys
from typing import TYPE_CHECKING, Any, ClassVar, Protocol

import pydantic

from lsst.utils import introspection
from lsst.utils.logging import LsstLogAdapter, getLogger

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

if TYPE_CHECKING:
from lsst.utils.logging import LsstLogAdapter
from ._task_metadata import TaskMetadata

__all__ = (
"AlgorithmError",
"AnnotatedPartialOutputsError",
"InvalidQuantumError",
"NoWorkFound",
"QuantumSuccessCaveats",
"RepeatableQuantumError",
"UnprocessableDataError",
"UpstreamFailureNoWorkFound",
)

_LOG = getLogger(__name__)


class QuantumSuccessCaveats(enum.Flag):
Expand Down Expand Up @@ -175,6 +184,142 @@ def legend() -> dict[str, str]:
}


class ExceptionInfo(pydantic.BaseModel):
"""Information about an exception that was raised."""

type_name: str
"""Fully-qualified Python type name for the exception raised."""

message: str
"""String message included in the exception."""

metadata: dict[str, float | int | str | bool | None]
"""Additional metadata included in the exception."""

@classmethod
def _from_metadata(cls, md: TaskMetadata) -> ExceptionInfo:
"""Construct from task metadata.

Parameters
----------
md : `TaskMetadata`
Metadata about the error, as written by
`AnnotatedPartialOutputsError`.

Returns
-------
info : `ExceptionInfo`
Information about the exception.
"""
result = cls(type_name=md["type"], message=md["message"], metadata={})
if "metadata" in md:
raw_err_metadata = md["metadata"].to_dict()
for k, v in raw_err_metadata.items():
# Guard against error metadata we wouldn't be able to serialize
# later via Pydantic; don't want one weird value bringing down
# our ability to report on an entire run.
if isinstance(v, float | int | str | bool):
result.metadata[k] = v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might we want a debugging message about the skipped metadata?

else:
_LOG.debug(
"Not propagating nested or JSON-incompatible exception metadata key %s=%r.", k, v
)
return result

# Work around the fact that Sphinx chokes on Pydantic docstring formatting,
# when we inherit those docstrings in our public classes.
if "sphinx" in sys.modules and not TYPE_CHECKING:

def copy(self, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.copy`."""
return super().copy(*args, **kwargs)

def model_dump(self, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_dump`."""
return super().model_dump(*args, **kwargs)

def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_dump_json`."""
return super().model_dump(*args, **kwargs)

def model_copy(self, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_copy`."""
return super().model_copy(*args, **kwargs)

@classmethod
def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
"""See `pydantic.BaseModel.model_construct`."""
return super().model_construct(*args, **kwargs)

@classmethod
def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_json_schema`."""
return super().model_json_schema(*args, **kwargs)

@classmethod
def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_validate`."""
return super().model_validate(*args, **kwargs)

@classmethod
def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_validate_json`."""
return super().model_validate_json(*args, **kwargs)

@classmethod
def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
"""See `pydantic.BaseModel.model_validate_strings`."""
return super().model_validate_strings(*args, **kwargs)


class QuantumAttemptStatus(enum.Enum):
"""Enum summarizing an attempt to run a quantum."""

UNKNOWN = -3
"""The status of this attempt is unknown.

This usually means no logs or metadata were written, and it at least could
not be determined whether the quantum was blocked by an upstream failure
(if it was definitely blocked, `BLOCKED` is set instead).
"""

LOGS_MISSING = -2
"""Task metadata was written for this attempt but logs were not.

This is a rare condition that requires a hard failure (i.e. the kind that
can prevent a ``finally`` block from running or I/O from being durable) at
a very precise time.
"""

FAILED = -1
"""Execution of the quantum failed.

This is always set if the task metadata dataset was not written but logs
were, as is the case when a Python exception is caught and handled by the
execution system. It may also be set in cases where logs were not written
either, but other information was available (e.g. from higher-level
orchestration tooling) to mark it as a failure.
"""

BLOCKED = 0
"""This quantum was not executed because an upstream quantum failed.

Upstream quanta with status `UNKNOWN` or `FAILED` are considered blockers;
`LOGS_MISSING` is not.
"""

SUCCESSFUL = 1
"""This quantum was successfully executed.

Quanta may be considered successful even if they do not write any outputs
or shortcut early by raising `NoWorkFound` or one of its variants. They
may even be considered successful if they raise
`AnnotatedPartialOutputsError` if the executor is configured to treat that
exception as a non-failure. See `QuantumSuccessCaveats` for details on how
these "successes with caveats" are reported.
"""


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
`GetSetDictMetadata`.
Expand Down
105 changes: 98 additions & 7 deletions python/lsst/pipe/base/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,105 @@

__all__ = ["LogCapture"]

import dataclasses
import logging
import os
import shutil
import tempfile
import uuid
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from logging import FileHandler

from lsst.daf.butler import Butler, FileDataset, LimitedButler, Quantum
from lsst.daf.butler.logging import ButlerLogRecordHandler, ButlerLogRecords, ButlerMDC, JsonLogFormatter
import pydantic

from ._status import InvalidQuantumError
from lsst.daf.butler import Butler, FileDataset, LimitedButler, Quantum
from lsst.daf.butler.logging import (
ButlerLogRecord,
ButlerLogRecordHandler,
ButlerLogRecords,
ButlerMDC,
JsonLogFormatter,
)

from ._status import ExceptionInfo, InvalidQuantumError
from ._task_metadata import TaskMetadata
from .automatic_connection_constants import METADATA_OUTPUT_TEMPLATE
from .pipeline_graph import TaskNode

_LOG = logging.getLogger(__name__)


class _LogCaptureFlag:
"""Simple flag to enable/disable log-to-butler saving."""
class _ExecutionLogRecordsExtra(pydantic.BaseModel):
"""Extra information about a quantum's execution stored with logs.

This middleware-private model includes information that is not directly
available via any public interface, as it is used exclusively for
provenance extraction and then made available through the provenance
quantum graph.
"""

exception: ExceptionInfo | None = None
"""Exception information for this quantum, if it failed.
"""

metadata: TaskMetadata | None = None
"""Metadata for this quantum, if it failed.

Metadata datasets are written if and only if a quantum succeeds, but we
still want to capture metadata from failed attempts, so we store it in the
log dataset. This field is always `None` when the quantum succeeds,
because in that case the metadata is already stored separately.
"""

previous_process_quanta: list[uuid.UUID] = pydantic.Field(default_factory=list)
"""The IDs of other quanta previously executed in the same process as this
one.
"""

logs: list[ButlerLogRecord] = pydantic.Field(default_factory=list)
"""Logs for this attempt.

This is always empty for the most recent attempt, because that stores logs
in the main section of the butler log records.
"""

previous_attempts: list[_ExecutionLogRecordsExtra] = pydantic.Field(default_factory=list)
"""Information about previous attempts to run this task within the same
`~lsst.daf.butler.CollectionType.RUN` collection.

This is always empty for any attempt other than the most recent one,
as all previous attempts are flattened into one list.
"""

def attach_previous_attempt(self, log_records: ButlerLogRecords) -> None:
"""Attach logs from a previous attempt to this struct.

Parameters
----------
log_records : `ButlerLogRecords`
Logs from a past attempt to run a quantum.
"""
previous = self.model_validate(log_records.extra)
previous.logs.extend(log_records)
self.previous_attempts.extend(previous.previous_attempts)
self.previous_attempts.append(previous)
previous.previous_attempts.clear()


@dataclasses.dataclass
class _LogCaptureContext:
"""Controls for log capture returned by the `LogCapture.capture_logging`
context manager.
"""

store: bool = True
"""Whether to store logs at all."""

extra: _ExecutionLogRecordsExtra = dataclasses.field(default_factory=_ExecutionLogRecordsExtra)
"""Extra information about the quantum's execution to store for provenance
extraction.
"""


class LogCapture:
Expand Down Expand Up @@ -88,7 +165,7 @@ def from_full(cls, butler: Butler) -> LogCapture:
return cls(butler, butler)

@contextmanager
def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[_LogCaptureFlag]:
def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[_LogCaptureContext]:
"""Configure logging system to capture logs for execution of this task.

Parameters
Expand Down Expand Up @@ -121,7 +198,7 @@ def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[
metadata_ref = quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=task_node.label)][0]
mdc["RUN"] = metadata_ref.run

ctx = _LogCaptureFlag()
ctx = _LogCaptureContext()
log_dataset_name = (
task_node.log_output.dataset_type_name if task_node.log_output is not None else None
)
Expand Down Expand Up @@ -154,6 +231,12 @@ def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[
# Ensure that the logs are stored in butler.
logging.getLogger().removeHandler(log_handler_file)
log_handler_file.close()
if ctx.extra:
with open(log_file, "a") as log_stream:
ButlerLogRecords.write_streaming_extra(
log_stream,
ctx.extra.model_dump_json(exclude_unset=True, exclude_defaults=True),
)
if ctx.store:
self._ingest_log_records(quantum, log_dataset_name, log_file)
shutil.rmtree(tmpdir, ignore_errors=True)
Expand All @@ -165,7 +248,15 @@ def capture_logging(self, task_node: TaskNode, /, quantum: Quantum) -> Iterator[
try:
with ButlerMDC.set_mdc(mdc):
yield ctx
except:
raise
else:
# If the quantum succeeded, we don't need to save the
# metadata in the logs, because we'll have saved them in
# the metadata.
ctx.extra.metadata = None
finally:
log_handler_memory.records.extra = ctx.extra.model_dump()
# Ensure that the logs are stored in butler.
logging.getLogger().removeHandler(log_handler_memory)
if ctx.store:
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/pipe/base/quantum_graph/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import zstandard

from lsst.daf.butler import DataCoordinate, DataIdValue
from lsst.daf.butler._rubin import generate_uuidv7
from lsst.resources import ResourcePath, ResourcePathExpression

from ..pipeline_graph import DatasetTypeNode, Edge, PipelineGraph, TaskImportMode, TaskNode
Expand Down Expand Up @@ -157,6 +158,11 @@ class HeaderModel(pydantic.BaseModel):
quantum graph file).
"""

provenance_dataset_id: uuid.UUID = pydantic.Field(default_factory=generate_uuidv7)
"""The dataset ID for provenance quantum graph when it is ingested into
a butler repository.
"""

@classmethod
def from_old_quantum_graph(cls, old_quantum_graph: QuantumGraph) -> HeaderModel:
"""Extract a header from an old `QuantumGraph` instance.
Expand Down
Loading
Loading