Skip to content

Commit

Permalink
Final documentation changes
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Oct 11, 2023
1 parent 7ca69da commit 2dc01d1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 65 deletions.
3 changes: 3 additions & 0 deletions doc/changes/DM-37163.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add a manifest checker which walks an executed quantum graph to generate a
summary report containing information about produced DatasetTypes, missing data,
and failures.
135 changes: 70 additions & 65 deletions python/lsst/pipe/base/execution_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,25 @@ class DatasetTypeExecutionReport:
"""A report on the number of produced datasets as well as the status of
missing datasets based on metadata.
A DatasetTypeExecutionReport is created for each DatasetType in a
TaskExecutionReport.
See Also
--------
TaskExecutionReport
QuantumGraphExecutionReport
A `DatasetTypeExecutionReport` is created for each `DatasetType` in a
`TaskExecutionReport`.
"""

missing_failed: set[DatasetRef] = dataclasses.field(default_factory=set)
"""Datasets not produced because their quanta failed directly in this
run (`set`).
"""

missing_not_produced: dict[DatasetRef, bool] = dataclasses.field(default_factory=dict)
"""Missing datasets which were not produced due either missing inputs or a
failure in finding inputs (`dict`).
bool: were predicted inputs produced?
"""

missing_upstream_failed: set[DatasetRef] = dataclasses.field(default_factory=set)
"""Datasets not produced due to an upstream failure (`set`).
"""

n_produced: int = 0
"""Count of datasets produced (`int`).
"""
Expand All @@ -78,9 +76,11 @@ def to_summary_dict(self) -> dict[str, Any]:
Returns
-------
A count of the DatasetTypes with each outcome; the number of produced,
missing_failed, missing_not_produced, and missing_upstream_failed
DatasetTypes. See above for attribute descriptions.
summary_dict : `dict`
A count of the datasets with each outcome; the number of
produced, `missing_failed`, `missing_not_produced`, and
`missing_upstream_failed` `DatasetTypes`. See above for attribute
descriptions.
"""
return {
"produced": self.n_produced,
Expand All @@ -96,18 +96,14 @@ def handle_missing_dataset(
Parameters
----------
output_ref: `DatasetRef`
output_ref : `~lsst.daf.butler.DatasetRef`
Dataset reference of the missing dataset.
failed: `bool`
failed : `bool`
Whether the task associated with the missing dataset failed.
status_graph: `networkx.DiGraph`
The quantum graph produced by TaskExecutionReport.inspect_quantum
status_graph : `networkx.DiGraph`
The quantum graph produced by `TaskExecutionReport.inspect_quantum`
which steps through the run quantum graph and logs the status of
each quanta.
See Also
--------
TaskExecutionReport.inspect_quantum
"""
if failed:
for upstream_quantum_id in status_graph.predecessors(output_ref.id):
Expand All @@ -129,11 +125,11 @@ def handle_produced_dataset(self, output_ref: DatasetRef, status_graph: networkx
Parameters
----------
output_ref: `DatasetRef`
output_ref : `~lsst.daf.butler.DatasetRef`
Dataset reference of the dataset.
status_graph: `networkx.DiGraph`
status_graph : `networkx.DiGraph`
The quantum graph produced by
QuantumGraphExecutionReport.make_reports() which steps through the
`QuantumGraphExecutionReport.make_reports` which steps through the
quantum graph of a run and logs the status of each quantum.
See Also
Expand Down Expand Up @@ -162,12 +158,14 @@ class TaskExecutionReport:
"""A mapping from quantum data ID to log dataset reference for quanta that
failed directly in this run (`dict`).
"""

failed_upstream: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict)
"""A mapping of data IDs of quanta that were not attempted due to an
upstream failure (`dict`).
"""

output_datasets: dict[str, DatasetTypeExecutionReport] = dataclasses.field(default_factory=dict)
"""Missing and produced outputs of each DatasetType (`dict`).
"""Missing and produced outputs of each `DatasetType` (`dict`).
"""

def inspect_quantum(
Expand All @@ -183,18 +181,20 @@ def inspect_quantum(
Parameters
----------
quantum_node: `QuantumNode`
quantum_node : `QuantumNode`
The specific node of the quantum graph to be inspected.
status_graph: `networkx.DiGraph`
status_graph : `networkx.DiGraph`
The quantum graph produced by
QuantumGraphExecutionReport.make_reports which steps through the
`QuantumGraphExecutionReport.make_reports` which steps through the
quantum graph of a run and logs the status of each quantum.
refs: `Mapping[str, Mapping[uuid.UUID, DatasetRef]]`
refs : `~collections.abc.Mapping` [ `str`,\
`~collections.abc.Mapping` [ `uuid.UUID`,\
`~lsst.daf.butler.DatasetRef` ] ]
The DatasetRefs of each of the DatasetTypes produced by the task.
Includes initialization, intermediate and output data products.
metadata_name: `str`
The metadataDataset name for the node.
log_name: `str`
metadata_name : `str`
The metadata dataset name for the node.
log_name : `str`
The name of the log files for the node.
See Also
Expand Down Expand Up @@ -235,22 +235,24 @@ def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]:
Parameters
----------
butler: `lsst.daf.butler.Butler`
butler : `lsst.daf.butler.Butler`
The Butler used for this report.
logs: `bool`
logs : `bool`
Store the logs in the summary dictionary.
Returns
-------
A dictionary containing:
outputs: `dict`
A dictionary summarizing the DatasetTypeExecutionReport for
each DatasetType associated with the task
failed_quanta: `dict`
A dictionary of quanta which failed and their dataIDs by
quantum graph node id
n_quanta_blocked: `int`
The number of quanta which failed due to upstream failures.
summary_dict : `dict`
A dictionary containing:
- outputs: A dictionary summarizing the
DatasetTypeExecutionReport for each DatasetType associated with
the task
- failed_quanta: A dictionary of quanta which failed and their
dataIDs by quantum graph node id
- n_quanta_blocked: The number of quanta which failed due to
upstream failures.
"""
failed_quanta = {}
for node_id, log_ref in self.failed.items():
Expand Down Expand Up @@ -290,8 +292,8 @@ class QuantumGraphExecutionReport:
Parameters
----------
tasks: `dict`
A dictionary of TaskExecutionReports by pipetask
tasks : `dict`
A dictionary of TaskExecutionReports by task label.
See Also
--------
Expand All @@ -300,38 +302,40 @@ class QuantumGraphExecutionReport:
"""

tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict)
"""A dictionary of TaskExecutionReports by pipetask (`dict`).
"""A dictionary of TaskExecutionReports by task label (`dict`).
"""

def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]:
"""Summarize the results of the QuantumGraphExecutionReport in a
"""Summarize the results of the `QuantumGraphExecutionReport` in a
dictionary.
Parameters
----------
butler: `lsst.daf.butler.Butler`
butler : `lsst.daf.butler.Butler`
The Butler used for this report.
logs: `bool`
logs : `bool`
Store the logs in the summary dictionary.
Returns
-------
A dictionary containing a TaskExecutionReport for each task in the
quantum graph.
summary_dict : `dict`
A dictionary containing a summary of a `TaskExecutionReport` for
each task in the quantum graph.
"""
return {task: report.to_summary_dict(butler, logs=logs) for task, report in self.tasks.items()}

def write_summary_yaml(self, butler: Butler, filename: str, logs: bool = True) -> None:
"""Take the dictionary from QuantumGraphExecutionReport.to_summary_dict
and store its contents in a yaml file.
"""Take the dictionary from
`QuantumGraphExecutionReport.to_summary_dict` and store its contents in
a yaml file.
Parameters
----------
butler: `lsst.daf.butler.Butler`
butler : `lsst.daf.butler.Butler`
The Butler used for this report.
filename: `str`
filename : `str`
The name to be used for the summary yaml file.
logs: `bool`
logs : `bool`
Store the logs in the summary dictionary.
"""
with open(filename, "w") as stream:
Expand All @@ -343,28 +347,28 @@ def make_reports(
butler: Butler,
graph: QuantumGraph | ResourcePathExpression,
) -> QuantumGraphExecutionReport:
"""Make a QuantumGraphExecutionReport.
"""Make a `QuantumGraphExecutionReport`.
Step through the quantum graph associated with a run, creating a
networkx.DiGraph called status_graph to annotate the status of each
`networkx.DiGraph` called status_graph to annotate the status of each
quantum node. For each task in the quantum graph, use
TaskExecutionReport.inspect_quantum to make a TaskExecutionReport based
on the status of each node. Return a TaskExecutionReport for each task
in the quantum graph.
`TaskExecutionReport.inspect_quantum` to make a `TaskExecutionReport`
based on the status of each node. Return a `TaskExecutionReport` for
each task in the quantum graph.
Parameters
----------
butler: `lsst.daf.butler.Butler`
butler : `lsst.daf.butler.Butler`
The Butler used for this report. This should match the Butler used
for the run associated with the executed quantum graph.
graph: `QuantumGraph` | `ResourcePathExpression`
graph : `QuantumGraph` | `ResourcePathExpression`
Either the associated quantum graph object or the uri of the
location of said quantum graph.
Returns
-------
report: `QuantumGraphExecutionReport`
The TaskExecutionReport for each task in the quantum graph.
The `TaskExecutionReport` for each task in the quantum graph.
"""
refs = {} # type: dict[str, Any]
status_graph = networkx.DiGraph()
Expand Down Expand Up @@ -424,15 +428,16 @@ def lookup_quantum_data_id(
Parameters
----------
graph_uri: `ResourcePathExpression`
graph_uri : `ResourcePathExpression`
URI of the quantum graph of the run.
nodes: `Sequence[uuid.UUID]`
nodes : `~collections.abc.Iterable` [ `uuid.UUID` ]
Quantum graph nodeID.
Returns
-------
A list of human-readable dataIDs which map to the nodeIDs on the quantum
graph at graph_uri.
data_ids : `list` [ `lsst.daf.butler.DataCoordinate` ]
A list of human-readable dataIDs which map to the nodeIDs on the
quantum graph at graph_uri.
"""
qg = QuantumGraph.loadUri(graph_uri, nodes=nodes)

Check warning on line 442 in python/lsst/pipe/base/execution_reports.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/execution_reports.py#L442

Added line #L442 was not covered by tests
return [qg.getQuantumNodeByNodeId(node).quantum.dataId for node in nodes]

0 comments on commit 2dc01d1

Please sign in to comment.