Skip to content

Commit

Permalink
Fix final mypy linter failures
Browse files Browse the repository at this point in the history
  • Loading branch information
eigerx committed Oct 10, 2023
1 parent dd90afd commit cabb4e2
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions python/lsst/pipe/base/execution_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import itertools
import logging
import uuid
from collections.abc import Iterable
from collections.abc import Iterable, Mapping
from typing import Any

import networkx
Expand Down Expand Up @@ -174,7 +174,7 @@ def inspect_quantum(
self,
quantum_node: QuantumNode,
status_graph: networkx.DiGraph,
refs: Iterable[DatasetRef],
refs: Mapping[str, Mapping[uuid.UUID, DatasetRef]],
metadata_name: str,
log_name: str,
) -> None:
Expand All @@ -189,7 +189,7 @@ def inspect_quantum(
The quantum graph produced by
QuantumGraphExecutionReport.make_reports which steps through the
quantum graph of a run and logs the status of each quantum.
refs: `Iterable[DatasetRef]`
refs: `Mapping[str, Mapping[uuid.UUID, DatasetRef]]`
The DatasetRefs of each of the DatasetTypes produced by the task.
Includes initialization, intermediate and output data products.
metadata_name: `str`
Expand All @@ -212,6 +212,7 @@ def inspect_quantum(
for upstream_dataset_id in status_graph.predecessors(quantum_node.nodeId)
for upstream_quantum_id in status_graph.predecessors(upstream_dataset_id)
):
assert quantum.dataId is not None
self.failed_upstream[quantum_node.nodeId] = quantum.dataId
else:
self.failed[quantum_node.nodeId] = log_ref
Expand Down Expand Up @@ -253,7 +254,7 @@ def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]:
"""
failed_quanta = {}
for node_id, log_ref in self.failed.items():
quantum_info = {"data_id": log_ref.dataId.byName()}
quantum_info: dict[str, Any] = {"data_id": log_ref.dataId.byName()}
if logs:
try:
log = butler.get(log_ref)
Expand Down Expand Up @@ -365,12 +366,13 @@ def make_reports(
report: `QuantumGraphExecutionReport`
The TaskExecutionReport for each task in the quantum graph.
"""
refs = {}
refs = {} # type: dict[str, Any]
status_graph = networkx.DiGraph()
if not isinstance(graph, QuantumGraph):
qg = QuantumGraph.loadUri(graph)

Check warning on line 372 in python/lsst/pipe/base/execution_reports.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/execution_reports.py#L372

Added line #L372 was not covered by tests
else:
qg = graph
assert qg.metadata is not None, "Saved QGs always have metadata."
collection = qg.metadata["output_run"]
report = cls()
task_defs = list(qg.iterTaskGraph())
Expand All @@ -397,6 +399,8 @@ def make_reports(

for task_def in qg.iterTaskGraph():
task_report = TaskExecutionReport()
if task_def.logOutputDatasetName is None:
raise RuntimeError("QG must have log outputs to use execution reports.")

Check warning on line 403 in python/lsst/pipe/base/execution_reports.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/execution_reports.py#L403

Added line #L403 was not covered by tests
for node in qg.getNodesForTask(task_def):
task_report.inspect_quantum(
node,
Expand All @@ -414,15 +418,15 @@ def __str__(self) -> str:

def lookup_quantum_data_id(
graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]
) -> list[DataCoordinate]:
) -> list[DataCoordinate | None]:
"""Look up a dataId from a quantum graph and a list of quantum graph
nodeIDs.
Parameters
----------
graph_uri: `ResourcePathExpression`
URI of the quantum graph of the run.
nodes: `Iterable[uuid.UUID]`
nodes: `Sequence[uuid.UUID]`
Quantum graph nodeID.
Returns
Expand Down

0 comments on commit cabb4e2

Please sign in to comment.