Skip to content

Commit

Permalink
Add support for order_by/limit to query results (DM-32403)
Browse files Browse the repository at this point in the history
SQL registry method queryDimensionRecords now returns special iterable
object instead of plain iterator. Iterables returned from queryDataIds
and queryDimensionRecords have new methods order_by() and limit(). Same
two methods were added to Query classes. Extended unit tests for query
methods to check this new functionality.

Note that remote registry does not support this functionality (yet).
  • Loading branch information
andy-slac committed Nov 16, 2021
1 parent 11151e1 commit 35f3ab0
Show file tree
Hide file tree
Showing 10 changed files with 945 additions and 110 deletions.
32 changes: 32 additions & 0 deletions python/lsst/daf/butler/core/timespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,30 @@ def contains(self: _S, other: Union[_S, sqlalchemy.sql.ColumnElement]) -> sqlalc
"""
raise NotImplementedError()

@abstractmethod
def lower(self: _S) -> sqlalchemy.sql.ColumnElement:
"""Return a SQLAlchemy expression representing a lower bound of a
timespan.
Returns
-------
lower : `sqlalchemy.sql.ColumnElement`
A SQLAlchemy expression for a lower bound.
"""
raise NotImplementedError()

@abstractmethod
def upper(self: _S) -> sqlalchemy.sql.ColumnElement:
"""Return a SQLAlchemy expression representing an upper bound of a
timespan.
Returns
-------
upper : `sqlalchemy.sql.ColumnElement`
A SQLAlchemy expression for an upper bound.
"""
raise NotImplementedError()


class _CompoundTimespanDatabaseRepresentation(TimespanDatabaseRepresentation):
"""Representation of a time span as two separate fields.
Expand Down Expand Up @@ -919,6 +943,14 @@ def contains(
else:
return sqlalchemy.sql.and_(self._nsec[0] <= other._nsec[0], self._nsec[1] >= other._nsec[1])

def lower(self) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited.
return self._nsec[0]

def upper(self) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited.
return self._nsec[1]

def flatten(self, name: Optional[str] = None) -> Iterator[sqlalchemy.sql.ColumnElement]:
# Docstring inherited.
if name is None:
Expand Down
47 changes: 28 additions & 19 deletions python/lsst/daf/butler/registries/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Mapping,
Optional,
Set,
Tuple,
TYPE_CHECKING,
Union,
)
Expand Down Expand Up @@ -84,6 +85,7 @@
from ..registry.wildcards import CategorizedWildcard, CollectionQuery, Ellipsis
from ..registry.summaries import CollectionSummary
from ..registry.managers import RegistryManagerTypes, RegistryManagerInstances
from ..registry.queries import Query
from ..registry.interfaces import ChainedCollectionRecord, DatasetIdGenEnum, RunRecord

if TYPE_CHECKING:
Expand Down Expand Up @@ -972,23 +974,30 @@ def queryDataIds(self, dimensions: Union[Iterable[Union[Dimension, str]], Dimens
elif collections:
raise TypeError(f"Cannot pass 'collections' (='{collections}') without 'datasets'.")

summary = queries.QuerySummary(
requested=requestedDimensions,
dataId=standardizedDataId,
expression=where,
bind=bind,
defaults=self.defaults.dataId,
check=check,
datasets=standardizedDatasetTypes,
)
builder = self._makeQueryBuilder(
summary,
doomed_by=[f"Dataset type {name} is not registered." for name in missing]
)
for datasetType in standardizedDatasetTypes:
builder.joinDataset(datasetType, collections, isResult=False)
query = builder.finish()
return queries.DataCoordinateQueryResults(self._db, query)
def query_factory(order_by: Optional[Iterable[str]] = None,
limit: Optional[Tuple[int, Optional[int]]] = None) -> Query:
"""Construct the Query object that generates query results.
"""
summary = queries.QuerySummary(
requested=requestedDimensions,
dataId=standardizedDataId,
expression=where,
bind=bind,
defaults=self.defaults.dataId,
check=check,
datasets=standardizedDatasetTypes,
order_by=order_by,
limit=limit
)
builder = self._makeQueryBuilder(
summary,
doomed_by=[f"Dataset type {name} is not registered." for name in missing]
)
for datasetType in standardizedDatasetTypes:
builder.joinDataset(datasetType, collections, isResult=False,)
return builder.finish()

return queries.DataCoordinateQueryResults(self._db, query_factory, requestedDimensions)

def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
dataId: Optional[DataId] = None,
Expand All @@ -998,7 +1007,7 @@ def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
components: Optional[bool] = None,
bind: Optional[Mapping[str, Any]] = None,
check: bool = True,
**kwargs: Any) -> Iterator[DimensionRecord]:
**kwargs: Any) -> queries.DimensionRecordQueryResults:
# Docstring inherited from lsst.daf.butler.registry.Registry
if not isinstance(element, DimensionElement):
try:
Expand All @@ -1008,7 +1017,7 @@ def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
+ str(self.dimensions.getStaticElements())) from e
dataIds = self.queryDataIds(element.graph, dataId=dataId, datasets=datasets, collections=collections,
where=where, components=components, bind=bind, check=check, **kwargs)
return iter(self._managers.dimensions[element].fetch(dataIds))
return queries.DatabaseDimensionRecordQueryResults(dataIds, self._managers.dimensions[element])

def queryDatasetAssociations(
self,
Expand Down
24 changes: 12 additions & 12 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,8 @@ def queryDatasetTypes(self, expression: Any = ..., *, components: Optional[bool]
expression : `Any`, optional
An expression that fully or partially identifies the dataset types
to return, such as a `str`, `re.Pattern`, or iterable thereof.
`...` can be used to return all dataset types, and is the default.
See :ref:`daf_butler_dataset_type_expressions` for more
``...`` can be used to return all dataset types, and is the
default. See :ref:`daf_butler_dataset_type_expressions` for more
information.
components : `bool`, optional
If `True`, apply all expression patterns to component dataset type
Expand Down Expand Up @@ -1154,7 +1154,7 @@ def queryCollections(self, expression: Any = ...,
expression : `Any`, optional
An expression that identifies the collections to return, such as
a `str` (for full matches or partial matches via globs),
`re.Pattern` (for partial matches), or iterable thereof. `...`
`re.Pattern` (for partial matches), or iterable thereof. ``...``
can be used to return all collections, and is the default.
See :ref:`daf_butler_collection_expressions` for more information.
datasetType : `DatasetType`, optional
Expand Down Expand Up @@ -1197,13 +1197,13 @@ def queryDatasets(self, datasetType: Any, *,
datasetType
An expression that fully or partially identifies the dataset types
to be queried. Allowed types include `DatasetType`, `str`,
`re.Pattern`, and iterables thereof. The special value `...` can
`re.Pattern`, and iterables thereof. The special value ``...`` can
be used to query all dataset types. See
:ref:`daf_butler_dataset_type_expressions` for more information.
collections: optional
An expression that identifies the collections to search, such as a
`str` (for full matches or partial matches via globs), `re.Pattern`
(for partial matches), or iterable thereof. `...` can be used to
(for partial matches), or iterable thereof. ``...`` can be used to
search all collections (actually just all `~CollectionType.RUN`
collections, because this will still find all datasets).
If not provided, ``self.default.collections`` is used. See
Expand All @@ -1228,7 +1228,7 @@ def queryDatasets(self, datasetType: Any, *,
collection in which a dataset of that dataset type appears
(according to the order of ``collections`` passed in). If `True`,
``collections`` must not contain regular expressions and may not
be `...`.
be ``...``.
components : `bool`, optional
If `True`, apply all dataset expression patterns to component
dataset type names as well. If `False`, never apply patterns to
Expand Down Expand Up @@ -1319,7 +1319,7 @@ def queryDataIds(self, dimensions: Union[Iterable[Union[Dimension, str]], Dimens
An expression that identifies the collections to search for
datasets, such as a `str` (for full matches or partial matches
via globs), `re.Pattern` (for partial matches), or iterable
thereof. `...` can be used to search all collections (actually
thereof. ``...`` can be used to search all collections (actually
just all `~CollectionType.RUN` collections, because this will
still find all datasets). If not provided,
``self.default.collections`` is used. Ignored unless ``datasets``
Expand Down Expand Up @@ -1381,7 +1381,7 @@ def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
components: Optional[bool] = None,
bind: Optional[Mapping[str, Any]] = None,
check: bool = True,
**kwargs: Any) -> Iterator[DimensionRecord]:
**kwargs: Any) -> Iterable[DimensionRecord]:
"""Query for dimension information matching user-provided criteria.
Parameters
Expand All @@ -1395,11 +1395,11 @@ def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
An expression that fully or partially identifies dataset types
that should constrain the yielded records. See `queryDataIds` and
:ref:`daf_butler_dataset_type_expressions` for more information.
collections: `Any`, optional
collections : `Any`, optional
An expression that identifies the collections to search for
datasets, such as a `str` (for full matches or partial matches
via globs), `re.Pattern` (for partial matches), or iterable
thereof. `...` can be used to search all collections (actually
thereof. ``...`` can be used to search all collections (actually
just all `~CollectionType.RUN` collections, because this will
still find all datasets). If not provided,
``self.default.collections`` is used. Ignored unless ``datasets``
Expand Down Expand Up @@ -1428,7 +1428,7 @@ def queryDimensionRecords(self, element: Union[DimensionElement, str], *,
Returns
-------
dataIds : `DataCoordinateQueryResults`
dataIds : `Iterator` [ `DimensionRecord` ]
Data IDs matching the given query parameters.
"""
raise NotImplementedError()
Expand Down Expand Up @@ -1458,7 +1458,7 @@ def queryDatasetAssociations(
An expression that identifies the collections to search for
datasets, such as a `str` (for full matches or partial matches
via globs), `re.Pattern` (for partial matches), or iterable
thereof. `...` can be used to search all collections (actually
thereof. ``...`` can be used to search all collections (actually
just all `~CollectionType.RUN` collections, because this will still
find all datasets). If not provided, ``self.default.collections``
is used. See :ref:`daf_butler_collection_expressions` for more
Expand Down
8 changes: 8 additions & 0 deletions python/lsst/daf/butler/registry/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,14 @@ def contains(self, other: Union[_RangeTimespanRepresentation, sqlalchemy.sql.Col
else:
return self.column.contains(other)

def lower(self) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited.
return sqlalchemy.sql.func.lower(self.column)

def upper(self) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited.
return sqlalchemy.sql.func.upper(self.column)

def flatten(self, name: Optional[str] = None) -> Iterator[sqlalchemy.sql.ColumnElement]:
# Docstring inherited.
if name is None:
Expand Down
62 changes: 61 additions & 1 deletion python/lsst/daf/butler/registry/queries/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

__all__ = ("QueryBuilder",)

import dataclasses
from typing import AbstractSet, Any, Iterable, List, Optional

import sqlalchemy.sql
Expand All @@ -35,11 +36,12 @@
)

from ...core.named import NamedKeyDict, NamedValueAbstractSet, NamedValueSet
from ...core import ddl

from .._collectionType import CollectionType
from ._structs import QuerySummary, QueryColumns, DatasetQueryColumns, RegistryManagers
from .expressions import convertExpressionToSql
from ._query import DirectQuery, DirectQueryUniqueness, EmptyQuery, Query
from ._query import DirectQuery, DirectQueryUniqueness, EmptyQuery, OrderByColumn, Query
from ..wildcards import CollectionSearch, CollectionQuery


Expand Down Expand Up @@ -546,5 +548,63 @@ def finish(self, joinMissing: bool = True) -> Query:
whereRegion=self.summary.where.dataId.region,
simpleQuery=self._simpleQuery,
columns=self._columns,
order_by_columns=self._order_by_columns(),
limit=self.summary.limit,
managers=self._managers,
doomed_by=self._doomed_by)

def _order_by_columns(self) -> Iterable[OrderByColumn]:
"""Generate columns to be used for ORDER BY clause.
Returns
-------
order_by_columns : `Iterable` [ `ColumnIterable` ]
Sequence of columns to appear in ORDER BY clause.
"""
order_by_columns: List[OrderByColumn] = []
if not self.summary.order_by:
return order_by_columns

for order_by_column in self.summary.order_by.order_by_columns:

column: sqlalchemy.sql.ColumnElement
field_spec: Optional[ddl.FieldSpec]
dimension: Optional[Dimension] = None
if order_by_column.column is None:
# dimension name, it has to be in SELECT list already, only
# add it to ORDER BY
assert isinstance(order_by_column.element, Dimension), "expecting full Dimension"
column = self._columns.getKeyColumn(order_by_column.element)
add_to_select = False
field_spec = None
dimension = order_by_column.element
else:
table = self._elements[order_by_column.element]

if order_by_column.column in ("timespan.begin", "timespan.end"):
TimespanReprClass = self._managers.TimespanReprClass
timespan_repr = TimespanReprClass.fromSelectable(table)
if order_by_column.column == "timespan.begin":
column = timespan_repr.lower()
label = f"{order_by_column.element.name}_timespan_begin"
else:
column = timespan_repr.upper()
label = f"{order_by_column.element.name}_timespan_end"
field_spec = ddl.FieldSpec(label, dtype=sqlalchemy.BigInteger, nullable=True)
else:
column = table.columns[order_by_column.column]
# make a unique label for it
label = f"{order_by_column.element.name}_{order_by_column.column}"
field_spec = order_by_column.element.RecordClass.fields.facts[order_by_column.column]
field_spec = dataclasses.replace(field_spec, name=label)

column = column.label(label)
add_to_select = True

order_by_columns.append(
OrderByColumn(column=column, ordering=order_by_column.ordering,
add_to_select=add_to_select, field_spec=field_spec,
dimension=dimension)
)

return order_by_columns

0 comments on commit 35f3ab0

Please sign in to comment.