Skip to content

Commit

Permalink
Merge pull request #401 from lsst/tickets/DM-42737
Browse files Browse the repository at this point in the history
DM-42737: Switch back to butler.registry query system in QG generation.
  • Loading branch information
TallJimbo committed Feb 14, 2024
2 parents a8697e8 + 6aea5bc commit a71a70a
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 50 deletions.
16 changes: 0 additions & 16 deletions .github/workflows/lint.yaml

This file was deleted.

1 change: 1 addition & 0 deletions doc/changes/DM-42737.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an incorrect count of previously-successful quanta in `QuantumGraphBuilder` logging.
14 changes: 8 additions & 6 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,14 @@ def _put(self, value: Any, ref: DatasetRef) -> None:

def get(
self,
dataset: InputQuantizedConnection
| list[DatasetRef | None]
| list[DeferredDatasetRef | None]
| DatasetRef
| DeferredDatasetRef
| None,
dataset: (
InputQuantizedConnection
| list[DatasetRef | None]
| list[DeferredDatasetRef | None]
| DatasetRef
| DeferredDatasetRef
| None
),
) -> Any:
"""Fetch data from the butler.
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/pipe/base/_task_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class PropertySetLike(Protocol):
``PropertySet`` to a `TaskMetadata`.
"""

def paramNames(self, topLevelOnly: bool = True) -> Collection[str]:
...
def paramNames(self, topLevelOnly: bool = True) -> Collection[str]: ...

def getArray(self, name: str) -> Any:
...
def getArray(self, name: str) -> Any: ...


def _isListLike(v: Any) -> bool:
Expand Down
27 changes: 14 additions & 13 deletions python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
)

if TYPE_CHECKING:
from lsst.daf.butler import Butler, DataCoordinateQueryResults, DimensionGroup
from lsst.daf.butler import Butler, DimensionGroup
from lsst.daf.butler.registry.queries import DataCoordinateQueryResults
from lsst.utils.logging import LsstLogAdapter

from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode
Expand Down Expand Up @@ -230,7 +231,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# to find these.
count = 0
try:
for ref in data_ids.find_datasets(dataset_type_node.name, self.input_collections):
for ref in data_ids.findDatasets(dataset_type_node.name, self.input_collections):
self.existing_datasets.inputs[
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
Expand All @@ -247,7 +248,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# that we might skip...
count = 0
try:
for ref in data_ids.find_datasets(dataset_type_node.name, self.skip_existing_in):
for ref in data_ids.findDatasets(dataset_type_node.name, self.skip_existing_in):
key = DatasetKey(dataset_type_node.name, ref.dataId.required_values)
self.existing_datasets.outputs_for_skip[key] = ref
count += 1
Expand All @@ -267,7 +268,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# previous block).
count = 0
try:
for ref in data_ids.find_datasets(dataset_type_node.name, [self.output_run]):
for ref in data_ids.findDatasets(dataset_type_node.name, [self.output_run]):
self.existing_datasets.outputs_in_the_way[
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
Expand Down Expand Up @@ -337,7 +338,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# IDs to the datasets we're looking for.
count = 0
try:
query_results = data_ids.find_related_datasets(
query_results = data_ids.findRelatedDatasets(
finder.dataset_type_node.dataset_type, self.input_collections
)
except MissingDatasetTypeError:
Expand Down Expand Up @@ -383,9 +384,9 @@ class _AllDimensionsQuery:
subgraph: PipelineGraph
"""Graph of this subset of the pipeline."""

grouped_by_dimensions: dict[
DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]
] = dataclasses.field(default_factory=dict)
grouped_by_dimensions: dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = (
dataclasses.field(default_factory=dict)
)
"""The tasks and dataset types of this subset of the pipeline, grouped
by their dimensions.
Expand Down Expand Up @@ -457,7 +458,7 @@ def from_builder(
result.query_args = {
"dimensions": dimensions,
"where": builder.where,
"data_id": result.subgraph.data_id,
"dataId": result.subgraph.data_id,
"bind": builder.bind,
}
if builder.dataset_query_constraint == DatasetQueryConstraintVariant.ALL:
Expand Down Expand Up @@ -492,15 +493,15 @@ def from_builder(
)
builder.log.verbose("Querying for data IDs with arguments:")
builder.log.verbose(" dimensions=%s,", list(result.query_args["dimensions"].names))
builder.log.verbose(" data_id=%s,", dict(result.query_args["data_id"].required))
builder.log.verbose(" dataId=%s,", dict(result.query_args["dataId"].required))
if result.query_args["where"]:
builder.log.verbose(" where=%s,", repr(result.query_args["where"]))
if "datasets" in result.query_args:
builder.log.verbose(" datasets=%s,", list(result.query_args["datasets"]))
if "collections" in result.query_args:
builder.log.verbose(" collections=%s,", list(result.query_args["collections"]))
with builder.butler._query() as query:
with query.data_ids(**result.query_args).materialize() as common_data_ids:
with builder.butler.registry.caching_context():
with builder.butler.registry.queryDataIds(**result.query_args).materialize() as common_data_ids:
builder.log.debug("Expanding data IDs.")
result.common_data_ids = common_data_ids.expanded()
yield result
Expand All @@ -527,7 +528,7 @@ def log_failure(self, log: LsstLogAdapter) -> None:
# so they can read it more easily and copy and paste into
# a Python terminal.
log.critical(" dimensions=%s,", list(self.query_args["dimensions"].names))
log.critical(" data_id=%s,", dict(self.query_args["data_id"].required))
log.critical(" dataId=%s,", dict(self.query_args["dataId"].required))
if self.query_args["where"]:
log.critical(" where=%s,", repr(self.query_args["where"]))
if "datasets" in self.query_args:
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ class PrerequisiteInput(BaseInput):
- Prerequisite inputs may be optional (regular inputs are never optional).
"""

lookupFunction: Callable[
[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]
] | None = None
lookupFunction: (
Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None
) = None

_connection_type_set: ClassVar[str] = "prerequisiteInputs"

Expand Down
6 changes: 2 additions & 4 deletions python/lsst/pipe/base/pipeline_graph/_mapping_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,10 @@ def is_resolved(self, key: str) -> bool:
return super().__getitem__(key) is not None

@overload
def get_if_resolved(self, key: str) -> DatasetTypeNode | None:
... # pragma: nocover
def get_if_resolved(self, key: str) -> DatasetTypeNode | None: ... # pragma: nocover

@overload
def get_if_resolved(self, key: str, default: _T) -> DatasetTypeNode | _T:
... # pragma: nocover
def get_if_resolved(self, key: str, default: _T) -> DatasetTypeNode | _T: ... # pragma: nocover

def get_if_resolved(self, key: str, default: Any = None) -> DatasetTypeNode | Any:
"""Get a node or return a default if it has not been resolved.
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/pipe/base/prerequisite_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ def __init__(
dataset type.
"""

lookup_function: Callable[
[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]
] | None
lookup_function: (
Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None
)
"""A task-provided callback for finding these datasets.
If this is not `None`, it must be used to ensure correct behavior.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet
if no_work_quanta:
message_terms.append(f"{len(no_work_quanta)} had no work to do")
if skipped_quanta:
message_terms.append(f"{len(no_work_quanta)} previously succeeded")
message_terms.append(f"{len(skipped_quanta)} previously succeeded")
message_parenthetical = f" ({', '.join(message_terms)})" if message_terms else ""
if remaining_quanta:
self.log.info(
Expand Down

0 comments on commit a71a70a

Please sign in to comment.