Skip to content

Commit

Permalink
Merge pull request #386 from lsst/tickets/DM-34340
Browse files Browse the repository at this point in the history
DM-34340: Adapt to daf_butler API changes.
  • Loading branch information
TallJimbo committed Nov 22, 2023
2 parents b64b5b1 + 41d1e04 commit a1e3b54
Show file tree
Hide file tree
Showing 23 changed files with 121 additions and 126 deletions.
4 changes: 2 additions & 2 deletions python/lsst/pipe/base/_dataset_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

# Use an empty dataID as a default.
def _default_dataId() -> DataCoordinate:
return DataCoordinate.makeEmpty(DimensionUniverse())
return DataCoordinate.make_empty(DimensionUniverse())


@dataclasses.dataclass(frozen=True, init=False)
Expand All @@ -56,7 +56,7 @@ class InMemoryDatasetHandle:
parameters will be converted into a dataId-like entity.
"""

_empty = DataCoordinate.makeEmpty(DimensionUniverse())
_empty = DataCoordinate.make_empty(DimensionUniverse())

def __init__(
self,
Expand Down
12 changes: 6 additions & 6 deletions python/lsst/pipe/base/_observation_dimension_packer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@ def __init__(
):
if config is None:
config = ObservationDimensionPackerConfig()
fixed = data_id.subset(data_id.universe.extract(["instrument"]))
fixed = data_id.subset(data_id.universe.conform(["instrument"]))
if is_exposure is None:
if "visit" in data_id.graph.names:
if "visit" in data_id.dimensions.names:
is_exposure = False
elif "exposure" in data_id.graph.names:
elif "exposure" in data_id.dimensions.names:
is_exposure = True
else:
raise ValueError(
"'is_exposure' was not provided and 'data_id' has no visit or exposure value."
)
if is_exposure:
dimensions = fixed.universe.extract(["instrument", "exposure", "detector"])
dimensions = fixed.universe.conform(["instrument", "exposure", "detector"])
else:
dimensions = fixed.universe.extract(["instrument", "visit", "detector"])
dimensions = fixed.universe.conform(["instrument", "visit", "detector"])
super().__init__(fixed, dimensions)
self.is_exposure = is_exposure
if config.n_detectors is not None:
Expand Down Expand Up @@ -166,7 +166,7 @@ def unpack(self, packedId: int) -> DataCoordinate:
"detector": detector,
("exposure" if self.is_exposure else "visit"): observation,
},
graph=self.dimensions,
dimensions=self._dimensions,
)


Expand Down
20 changes: 10 additions & 10 deletions python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from contextlib import contextmanager
from typing import Any, final

from lsst.daf.butler import Butler, DimensionGraph
from lsst.daf.butler import Butler, DimensionGroup
from lsst.daf.butler.registry import MissingDatasetTypeError
from lsst.daf.butler.registry.queries import DataCoordinateQueryResults
from lsst.utils.logging import LsstLogAdapter
Expand Down Expand Up @@ -230,7 +230,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
try:
for ref in data_ids.findDatasets(dataset_type_node.name, self.input_collections):
self.existing_datasets.inputs[
DatasetKey(dataset_type_node.name, ref.dataId.values_tuple())
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
count += 1
except MissingDatasetTypeError:
Expand All @@ -246,7 +246,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
count = 0
try:
for ref in data_ids.findDatasets(dataset_type_node.name, self.skip_existing_in):
key = DatasetKey(dataset_type_node.name, ref.dataId.values_tuple())
key = DatasetKey(dataset_type_node.name, ref.dataId.required_values)
self.existing_datasets.outputs_for_skip[key] = ref
count += 1
if ref.run == self.output_run:
Expand All @@ -267,7 +267,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
try:
for ref in data_ids.findDatasets(dataset_type_node.name, [self.output_run]):
self.existing_datasets.outputs_in_the_way[
DatasetKey(dataset_type_node.name, ref.dataId.values_tuple())
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
count += 1
except MissingDatasetTypeError:
Expand Down Expand Up @@ -342,7 +342,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
query_results = []
for data_id, ref in query_results:
dataset_key = PrerequisiteDatasetKey(finder.dataset_type_node.name, ref.id.bytes)
quantum_key = QuantumKey(task_node.label, data_id.values_tuple())
quantum_key = QuantumKey(task_node.label, data_id.required_values)
# The column-subset operation used to make `data_ids`
# from `common_data_ids` can strip away post-query
# filtering; e.g. if we starts with a {visit, patch}
Expand Down Expand Up @@ -382,7 +382,7 @@ class _AllDimensionsQuery:
"""Graph of this subset of the pipeline."""

grouped_by_dimensions: dict[
DimensionGraph, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]
DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]
] = dataclasses.field(default_factory=dict)
"""The tasks and dataset types of this subset of the pipeline, grouped
by their dimensions.
Expand Down Expand Up @@ -441,7 +441,7 @@ def from_builder(
(
result.empty_dimensions_tasks,
result.empty_dimensions_dataset_types,
) = result.grouped_by_dimensions.pop(builder.universe.empty)
) = result.grouped_by_dimensions.pop(builder.universe.empty.as_group())
result.overall_inputs = {
name: node # type: ignore
for name, node in result.subgraph.iter_overall_inputs()
Expand All @@ -450,7 +450,7 @@ def from_builder(
dimension_names: set[str] = set()
for dimensions_for_group in result.grouped_by_dimensions.keys():
dimension_names.update(dimensions_for_group.names)
dimensions = builder.universe.extract(dimension_names)
dimensions = builder.universe.conform(dimension_names)
builder.log.debug("Building query for data IDs.")
result.query_args = {
"dimensions": dimensions,
Expand Down Expand Up @@ -490,7 +490,7 @@ def from_builder(
)
builder.log.verbose("Querying for data IDs with arguments:")
builder.log.verbose(" dimensions=%s,", list(result.query_args["dimensions"].names))
builder.log.verbose(" dataId=%s,", result.query_args["dataId"].byName())
builder.log.verbose(" dataId=%s,", dict(result.query_args["dataId"].required))
if result.query_args["where"]:
builder.log.verbose(" where=%s,", repr(result.query_args["where"]))
if "datasets" in result.query_args:
Expand Down Expand Up @@ -519,7 +519,7 @@ def log_failure(self, log: LsstLogAdapter) -> None:
# so they can read it more easily and copy and paste into
# a Python terminal.
log.critical(" dimensions=%s,", list(self.query_args["dimensions"].names))
log.critical(" dataId=%s,", self.query_args["dataId"].byName())
log.critical(" dataId=%s,", dict(self.query_args["dataId"].required))
if self.query_args["where"]:
log.critical(" where=%s,", repr(self.query_args["where"]))
if "datasets" in self.query_args:
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def makeDatasetType(
"""
return DatasetType(
self.name,
universe.extract(self.dimensions),
universe.conform(self.dimensions),
self.storageClass,
isCalibration=self.isCalibration,
parentStorageClass=parentStorageClass,
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/execution_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st
"""
failed_quanta = {}
for node_id, log_ref in self.failed.items():
quantum_info: dict[str, Any] = {"data_id": log_ref.dataId.byName()}
quantum_info: dict[str, Any] = {"data_id": dict(log_ref.dataId.required)}
if do_store_logs:
try:
log = butler.get(log_ref)
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ def __getstate__(self) -> dict:
dId = node.quantum.dataId
if dId is None:
continue
universe = dId.graph.universe
universe = dId.universe
return {"reduced": self._buildSaveObject(), "graphId": self._buildId, "universe": universe}

def __setstate__(self, state: dict) -> None:
Expand Down
21 changes: 6 additions & 15 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@

# -----------------------------
# Imports for other modules --
from lsst.daf.butler import (
DataCoordinate,
DatasetType,
DimensionUniverse,
NamedValueSet,
Registry,
SkyPixDimension,
)
from lsst.daf.butler import DataCoordinate, DatasetType, DimensionUniverse, NamedValueSet, Registry
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImportType
from lsst.utils.introspection import get_full_type_name
Expand Down Expand Up @@ -641,7 +634,7 @@ def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate:
instrument_class = cast(PipeBaseInstrument, doImportType(instrument_class_name))
if instrument_class is not None:
return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe)
return DataCoordinate.makeEmpty(universe)
return DataCoordinate.make_empty(universe)

def addTask(self, task: type[PipelineTask] | str, label: str) -> None:
"""Add a new task to the pipeline, or replace a task that is already
Expand Down Expand Up @@ -1042,10 +1035,8 @@ def makeDatasetTypesSet(
"Note that reference catalog names are now used as the dataset "
"type name instead of 'ref_cat'."
) from err
rest1 = set(registry.dimensions.extract(dimensions - {"skypix"}).names)
rest2 = {
dim.name for dim in datasetType.dimensions if not isinstance(dim, SkyPixDimension)
}
rest1 = set(registry.dimensions.conform(dimensions - {"skypix"}).names)
rest2 = datasetType.dimensions.names - datasetType.dimensions.skypix.names
if rest1 != rest2:
raise ValueError(
f"Non-skypix dimensions for dataset type {c.name} declared in "
Expand Down Expand Up @@ -1143,7 +1134,7 @@ def makeDatasetTypesSet(

# Metadata is supposed to be of the TaskMetadata type, its dimensions
# correspond to a task quantum.
dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
dimensions = registry.dimensions.conform(taskDef.connections.dimensions)

# Allow the storage class definition to be read from the existing
# dataset type definition if present.
Expand All @@ -1158,7 +1149,7 @@ def makeDatasetTypesSet(

if taskDef.logOutputDatasetName is not None:
# Log output dimensions correspond to a task quantum.
dimensions = registry.dimensions.extract(taskDef.connections.dimensions)
dimensions = registry.dimensions.conform(taskDef.connections.dimensions)
outputs.update(
{
DatasetType(
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/pipe/base/pipeline_graph/_dataset_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from typing import TYPE_CHECKING, Any

import networkx
from lsst.daf.butler import DatasetRef, DatasetType, DimensionGraph, Registry, StorageClass
from lsst.daf.butler import DatasetRef, DatasetType, DimensionGroup, Registry, StorageClass
from lsst.daf.butler.registry import MissingDatasetTypeError

from ._exceptions import DuplicateOutputError
Expand Down Expand Up @@ -185,9 +185,9 @@ def key(self) -> NodeKey:
return NodeKey(NodeType.DATASET_TYPE, self.dataset_type.name)

@property
def dimensions(self) -> DimensionGraph:
def dimensions(self) -> DimensionGroup:
"""Dimensions of the dataset type."""
return self.dataset_type.dimensions
return self.dataset_type.dimensions.as_group()

@property
def storage_class_name(self) -> str:
Expand Down
12 changes: 6 additions & 6 deletions python/lsst/pipe/base/pipeline_graph/_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from collections.abc import Mapping, Sequence
from typing import Any, ClassVar, TypeVar

from lsst.daf.butler import DatasetRef, DatasetType, DimensionUniverse, SkyPixDimension
from lsst.daf.butler import DatasetRef, DatasetType, DimensionUniverse
from lsst.daf.butler.registry import MissingDatasetTypeError
from lsst.utils.classes import immutable

Expand Down Expand Up @@ -495,17 +495,17 @@ def _resolve_dataset_type(
f"Note that reference catalog names are now used as the dataset "
f"type name instead of 'ref_cat'."
)
rest1 = set(universe.extract(self.raw_dimensions - {"skypix"}).names)
rest2 = {dim.name for dim in current.dimensions if not isinstance(dim, SkyPixDimension)}
rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names)
rest2 = current.dimensions.names - current.dimensions.skypix.names
if rest1 != rest2:
raise IncompatibleDatasetTypeError(
f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in "
f"connections ({rest1}) are inconsistent with those in "
f"registry's version of this dataset ({rest2})."
)
dimensions = current.dimensions
dimensions = current.dimensions.as_group()
else:
dimensions = universe.extract(self.raw_dimensions)
dimensions = universe.conform(self.raw_dimensions)
is_initial_query_constraint = is_initial_query_constraint and not self.defer_query_constraint
if is_prerequisite is None:
is_prerequisite = self.is_prerequisite
Expand Down Expand Up @@ -702,7 +702,7 @@ def _resolve_dataset_type(self, current: DatasetType | None, universe: Dimension
Raised if ``current is not None`` and this edge's definition is not
compatible with it.
"""
dimensions = universe.extract(self.raw_dimensions)
dimensions = universe.conform(self.raw_dimensions)
dataset_type = DatasetType(
self.parent_dataset_type_name,
dimensions,
Expand Down
16 changes: 9 additions & 7 deletions python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import networkx
import networkx.algorithms.bipartite
import networkx.algorithms.dag
from lsst.daf.butler import DataCoordinate, DataId, DimensionGraph, DimensionUniverse, Registry
from lsst.daf.butler import DataCoordinate, DataId, DimensionGroup, DimensionUniverse, Registry
from lsst.resources import ResourcePath, ResourcePathExpression

from ._dataset_types import DatasetTypeNode
Expand Down Expand Up @@ -1329,7 +1329,7 @@ def iter_overall_inputs(self) -> Iterator[tuple[str, DatasetTypeNode | None]]:

def group_by_dimensions(
self, prerequisites: bool = False
) -> dict[DimensionGraph, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]]:
) -> dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]]:
"""Group this graph's tasks and dataset types by their dimensions.
Parameters
Expand All @@ -1340,8 +1340,8 @@ def group_by_dimensions(
Returns
-------
groups : `dict` [ `DimensionGraph`, `tuple` ]
A dictionary of groups keyed by `DimensionGraph`, in which each
groups : `dict` [ `DimensionGroup`, `tuple` ]
A dictionary of groups keyed by `DimensionGroup`, in which each
value is a tuple of:
- a `dict` of `TaskNode` instances, keyed by task label
Expand All @@ -1355,7 +1355,7 @@ def group_by_dimensions(
Init inputs and outputs are always included, but always have empty
dimensions and are hence are all grouped together.
"""
result: dict[DimensionGraph, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = {}
result: dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = {}
next_new_value: tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]] = ({}, {})
for task_label, task_node in self.tasks.items():
if task_node.dimensions is None:
Expand All @@ -1368,7 +1368,9 @@ def group_by_dimensions(
raise UnresolvedGraphError(f"Dataset type {dataset_type_name!r} has not been resolved.")
if not dataset_type_node.is_prerequisite or prerequisites:
if (
group := result.setdefault(dataset_type_node.dataset_type.dimensions, next_new_value)
group := result.setdefault(
dataset_type_node.dataset_type.dimensions.as_group(), next_new_value
)
) is next_new_value:
next_new_value = ({}, {}) # make new lists for next time
group[1][dataset_type_node.name] = dataset_type_node
Expand Down Expand Up @@ -1507,7 +1509,7 @@ def _init_from_args(
universe = data_id.universe
else:
assert universe is data_id.universe, "data_id.universe and given universe differ"
self._raw_data_id = data_id.byName()
self._raw_data_id = dict(data_id.required)
elif data_id is None:
self._raw_data_id = {}
else:
Expand Down

0 comments on commit a1e3b54

Please sign in to comment.