Skip to content

Commit

Permalink
Add dataset query support to new query system.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Dec 11, 2023
1 parent 758a7d2 commit 0e68108
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 10 deletions.
48 changes: 47 additions & 1 deletion python/lsst/daf/butler/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
from .._query import Query
from ..dimensions import DataCoordinate, DataId, DataIdValue, DimensionGroup
from .data_coordinate_results import DataCoordinateResultSpec, RelationDataCoordinateQueryResults
from .dataset_results import (
ChainedDatasetQueryResults,
DatasetRefResultSpec,
RelationSingleTypeDatasetQueryResults,
)
from .dimension_record_results import DimensionRecordResultSpec, RelationDimensionRecordQueryResults
from .driver import QueryDriver
from .expression_factory import ExpressionFactory, ExpressionProxy
Expand Down Expand Up @@ -171,6 +176,8 @@ def data_ids(
)
return RelationDataCoordinateQueryResults(self._driver, tree, result_spec)

# TODO add typing.overload variants for single-dataset-type and patterns.

def datasets(
self,
dataset_type: Any,
Expand All @@ -183,7 +190,46 @@ def datasets(
**kwargs: Any,
) -> DatasetQueryResults:
# Docstring inherited.
raise NotImplementedError("TODO")
resolved_dataset_types = self._driver.resolve_dataset_type_wildcard(dataset_type)
data_id = DataCoordinate.standardize(data_id, universe=self._driver.universe, **kwargs)
where_terms = convert_where_args(self._tree, where, data_id, bind=bind, **kwargs)
single_type_results: list[RelationSingleTypeDatasetQueryResults] = []
for name, resolved_dataset_type in resolved_dataset_types.items():
tree = self._tree
if name not in tree.available_dataset_types:
resolved_collections, collections_ordered = self._driver.resolve_collection_wildcard(
collections
)
if find_first and not collections_ordered:
raise InvalidRelationError(
f"Unordered collections argument {collections} requires find_first=False."
)
tree = tree.join(
DatasetSearch.model_construct(
dataset_type=name,
dimensions=resolved_dataset_type.dimensions.as_group(),
collections=tuple(resolved_collections),
)
)
elif collections is not None:
raise InvalidRelationError(
f"Dataset type {name!r} was already joined into this query but new collections "
f"{collections!r} were still provided."
)
if where_terms:
tree = tree.where(*where_terms)
if find_first:
tree = tree.find_first(name, resolved_dataset_type.dimensions.as_group())
spec = DatasetRefResultSpec.model_construct(
dataset_type=resolved_dataset_type, include_dimension_records=self._include_dimension_records
)
single_type_results.append(
RelationSingleTypeDatasetQueryResults(self._driver, tree=tree, spec=spec)
)
if len(single_type_results) == 1:
return single_type_results[0]
else:
return ChainedDatasetQueryResults(tuple(single_type_results))

def dimension_records(
self,
Expand Down
8 changes: 5 additions & 3 deletions python/lsst/daf/butler/queries/data_coordinate_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DataCoordinateResultPage(pydantic.BaseModel):


class RelationDataCoordinateQueryResults(DataCoordinateQueryResults):
"""Implementation of DataCoordinateQueryResults for the relation-based
"""Implementation of `DataCoordinateQueryResults` for the relation-based
query system.
Parameters
Expand Down Expand Up @@ -145,7 +145,9 @@ def expanded(self) -> DataCoordinateQueryResults:
return RelationDataCoordinateQueryResults(
self._driver,
tree=self._tree,
spec=DataCoordinateResultSpec(dimensions=self._spec.dimensions, include_dimension_records=True),
spec=DataCoordinateResultSpec.model_construct(
dimensions=self._spec.dimensions, include_dimension_records=True
),
)

def subset(
Expand All @@ -170,7 +172,7 @@ def subset(
return RelationDataCoordinateQueryResults(
self._driver,
tree=self._tree,
spec=DataCoordinateResultSpec(
spec=DataCoordinateResultSpec.model_construct(
dimensions=dimensions, include_dimension_records=self._spec.include_dimension_records
),
)
Expand Down
183 changes: 178 additions & 5 deletions python/lsst/daf/butler/queries/dataset_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,34 @@
__all__ = (
"DatasetRefResultSpec",
"DatasetRefResultPage",
"RelationSingleTypeDatasetQueryResults",
)

import itertools
from collections.abc import Iterable, Iterator
from contextlib import ExitStack, contextmanager
from typing import Literal

import pydantic

from .._dataset_ref import DatasetRef
from ..dimensions import DimensionGroup
from .driver import PageKey
from .._dataset_type import DatasetType
from .._query_results import SingleTypeDatasetQueryResults
from .data_coordinate_results import (
DataCoordinateResultSpec,
DatasetQueryResults,
RelationDataCoordinateQueryResults,
)
from .driver import PageKey, QueryDriver
from .relation_tree import Materialization, RootRelation, make_unit_relation


class DatasetRefResultSpec(pydantic.BaseModel):
"""Specification for a query that yields `DatasetRef` objects."""

result_type: Literal["dataset_ref"] = "dataset_ref"
dataset_type_name: str | None
dimensions: DimensionGroup
with_dimension_records: bool
dataset_type: DatasetType
include_dimension_records: bool


class DatasetRefResultPage(pydantic.BaseModel):
Expand All @@ -60,3 +70,166 @@ class DatasetRefResultPage(pydantic.BaseModel):
# attached DimensionRecords and is Pydantic-friendly. Right now this model
# isn't actually serializable.
rows: list[DatasetRef]


class RelationSingleTypeDatasetQueryResults(SingleTypeDatasetQueryResults):
"""Implementation of `SingleTypeDatasetQueryResults` for the relation-based
query system.
Parameters
----------
driver : `QueryDriver`
Implementation object that knows how to actually execute queries.
tree : `RootRelation`
Description of the query as a tree of relation operations. The
instance returned directly by the `Butler._query` entry point should be
constructed via `make_unit_relation`.
spec : `DatasetRefResultSpec`
Specification for the details of the dataset references to return.
Notes
-----
Ideally this will eventually just be "SingleTypeDatasetQueryResults",
because we won't need an ABC if this is the only implementation.
"""

def __init__(self, driver: QueryDriver, tree: RootRelation, spec: DatasetRefResultSpec):
self._driver = driver
self._tree = tree
self._spec = spec

def __iter__(self) -> Iterator[DatasetRef]:
page = self._driver.execute(self._tree, self._spec)
yield from page.rows
while page.next_key is not None:
page = self._driver.fetch_next_page(self._spec, page.next_key)
yield from page.rows

@contextmanager
def materialize(self) -> Iterator[RelationSingleTypeDatasetQueryResults]:
# Docstring inherited from DatasetQueryResults.
key = self._driver.materialize(self._tree, frozenset())
yield RelationSingleTypeDatasetQueryResults(
self._driver,
tree=make_unit_relation(self._driver.universe).join(
Materialization.model_construct(
key=key, operand=self._tree, dataset_types=frozenset({self.dataset_type.name})
)
),
spec=self._spec,
)
# TODO: Right now we just rely on the QueryDriver context instead of
# using this one. If we want this to remain a context manager, we
# should make it do something, e.g. by adding QueryDriver method to
# drop a materialization.

@property
def dataset_type(self) -> DatasetType:
# Docstring inherited.
return self._spec.dataset_type

@property
def data_ids(self) -> RelationDataCoordinateQueryResults:
# Docstring inherited.
return RelationDataCoordinateQueryResults(
self._driver,
tree=self._tree,
spec=DataCoordinateResultSpec.model_construct(
dimensions=self.dataset_type.dimensions.as_group(),
include_dimension_records=self._spec.include_dimension_records,
),
)

def expanded(self) -> RelationSingleTypeDatasetQueryResults:
# Docstring inherited.
if self._spec.include_dimension_records:
return self
return RelationSingleTypeDatasetQueryResults(
self._driver,
tree=self._tree,
spec=DatasetRefResultSpec.model_construct(
dataset_type=self.dataset_type, include_dimension_records=True
),
)

def by_dataset_type(self) -> Iterator[SingleTypeDatasetQueryResults]:
# Docstring inherited.
return iter((self,))

def count(self, *, exact: bool = True, discard: bool = False) -> int:
# Docstring inherited.
return self._driver.count(self._tree, exact=exact, discard=discard)

def any(self, *, execute: bool = True, exact: bool = True) -> bool:
# Docstring inherited.
return self._driver.any(self._tree, execute=execute, exact=exact)

def explain_no_results(self, execute: bool = True) -> Iterable[str]:
# Docstring inherited.
return self._driver.explain_no_results(self._tree, execute=execute)


class ChainedDatasetQueryResults(DatasetQueryResults):
"""Implementation of `DatasetQueryResults` that delegates to a sequence
of `SingleTypeDatasetQueryResults`.
Parameters
----------
by_dataset_type : `tuple` [ `SingleTypeDatasetQueryResults` ]
Tuple of single-dataset-type query result objects to combine.
Notes
-----
Ideally this will eventually just be "DatasetQueryResults", because we
won't need an ABC if this is the only implementation.
"""

def __init__(self, by_dataset_type: tuple[SingleTypeDatasetQueryResults, ...]):
self._by_dataset_type = by_dataset_type

def __iter__(self) -> Iterator[DatasetRef]:
return itertools.chain.from_iterable(self._by_dataset_type)

def by_dataset_type(self) -> Iterator[SingleTypeDatasetQueryResults]:
# Docstring inherited.
return iter(self._by_dataset_type)

@contextmanager
def materialize(self) -> Iterator[DatasetQueryResults]:
# Docstring inherited.
with ExitStack() as exit_stack:
yield ChainedDatasetQueryResults(
tuple(
[
exit_stack.enter_context(single_type_results.materialize())
for single_type_results in self._by_dataset_type
]
)
)

def expanded(self) -> ChainedDatasetQueryResults:
# Docstring inherited.
return ChainedDatasetQueryResults(
tuple([single_type_results.expanded() for single_type_results in self._by_dataset_type])
)

def count(self, *, exact: bool = True, discard: bool = False) -> int:
# Docstring inherited.
return sum(
single_type_results.count(exact=exact, discard=discard)
for single_type_results in self._by_dataset_type
)

def any(self, *, execute: bool = True, exact: bool = True) -> bool:
# Docstring inherited.
return any(
single_type_results.any(execute=execute, exact=exact)
for single_type_results in self._by_dataset_type
)

def explain_no_results(self, execute: bool = True) -> Iterable[str]:
# Docstring inherited.
messages: list[str] = []
for single_type_results in self._by_dataset_type:
messages.extend(single_type_results.explain_no_results(execute=execute))
return messages
51 changes: 50 additions & 1 deletion python/lsst/daf/butler/queries/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@
from abc import abstractmethod
from collections.abc import Iterable
from contextlib import AbstractContextManager
from typing import Annotated, TypeAlias, Union, overload
from typing import TYPE_CHECKING, Annotated, Any, TypeAlias, Union, overload

import pydantic

from .._dataset_type import DatasetType
from ..dimensions import DataIdValue, DimensionGroup, DimensionUniverse
from .data_coordinate_results import DataCoordinateResultPage, DataCoordinateResultSpec
from .dataset_results import DatasetRefResultPage, DatasetRefResultSpec
from .dimension_record_results import DimensionRecordResultPage, DimensionRecordResultSpec
from .general_results import GeneralResultPage, GeneralResultSpec
from .relation_tree import MaterializationKey, RootRelation, UploadKey

if TYPE_CHECKING:
from ..registry import CollectionArgType

PageKey: TypeAlias = uuid.UUID


Expand All @@ -63,6 +67,8 @@ class QueryDriver(AbstractContextManager[None]):
"""Base class for the implementation object inside `RelationQuery` objects
that is specialized for DirectButler vs. RemoteButler.
Notes
-----
Implementations should be context managers. This allows them to manage the
lifetime of server-side state, such as:
Expand Down Expand Up @@ -300,6 +306,49 @@ def explain_no_results(self, tree: RootRelation, execute: bool) -> Iterable[str]
"""
raise NotImplementedError()

@abstractmethod
def resolve_collection_wildcard(
self, collections: CollectionArgType | None = None
) -> tuple[list[str], bool]:
"""Resolve a collection argument into a sequence of collection names.
Parameters
----------
collections
Collection search path argument. If `None`, the default
collections for the client should be used, if there are any.
Returns
-------
matched : `list` [ `str` ]
Matching collection names. `~CollectionType.CHAINED` collections
are included directly rather than flattened.
ordered : `bool`
If `True`, the expression specified an order that can be used in
a find-first search.
"""
raise NotImplementedError()

@abstractmethod
def resolve_dataset_type_wildcard(self, dataset_type: Any) -> dict[str, DatasetType]:
"""Resolve a dataset type argument into a mapping of `DatasetType`
objects.
Parameters
----------
dataset_type
Dataset type name, object, or wildcard to resolve.
Returns
-------
matched : `dict` [ `str`, `DatasetType` ]
Mapping from dataset type name to dataset type. Storage classes
passed in should be preserved, but component dataset types should
result in an exception.
"""
raise NotImplementedError()

@abstractmethod
def get_dataset_dimensions(self, name: str) -> DimensionGroup:
"""Return the dimensions for a dataset type."""
raise NotImplementedError()

0 comments on commit 0e68108

Please sign in to comment.