Skip to content

Commit

Permalink
Reimplement order_by method for query results (DM-33164)
Browse files Browse the repository at this point in the history
New implementation uses window function row_number() to generate
ordering rank for DataIds query made by queryDataIds method. This
value is stored as a separate colum by materialized query and also
propagated to a top-level query where it is again used for ordering.
This simplifies handling of the ordering columns by avoiding tracking
them across different contexts, but it relies on the fact that there is
only a single query that generates those values. If there needs to be
a UNION of multiple queries, it cannot order results across multiple
queries. As far as I can tell currently we don't need to UNIONize
queries for queryDataIds or queryDimensionRecords purposes, so we should
be OK.
  • Loading branch information
andy-slac committed Jan 11, 2022
1 parent 853e70c commit 8d7bf43
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 116 deletions.
20 changes: 19 additions & 1 deletion python/lsst/daf/butler/core/simpleQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

__all__ = ("SimpleQuery",)

from typing import Any, ClassVar, List, Optional, Type, TypeVar, Union
from typing import Any, ClassVar, List, Optional, Tuple, Type, TypeVar, Union

import sqlalchemy

Expand All @@ -39,6 +39,8 @@ class SimpleQuery:
def __init__(self) -> None:
self.columns = []
self.where = []
self.order_by = []
self.limit = None
self._from: Optional[sqlalchemy.sql.FromClause] = None

class Select:
Expand Down Expand Up @@ -126,6 +128,12 @@ def combine(self) -> sqlalchemy.sql.Select:
result = result.select_from(self._from)
if self.where:
result = result.where(sqlalchemy.sql.and_(*self.where))
if self.order_by:
result = result.order_by(*self.order_by)
if self.limit:
result = result.limit(self.limit[0])
if self.limit[1] is not None:
result = result.offset(self.limit[1])
return result

@property
Expand All @@ -152,6 +160,8 @@ def copy(self) -> SimpleQuery:
result = SimpleQuery()
result.columns = list(self.columns)
result.where = list(self.where)
result.order_by = list(self.order_by)
result.limit = self.limit
result._from = self._from
return result

Expand All @@ -164,3 +174,11 @@ def copy(self) -> SimpleQuery:
"""Boolean expressions that will be combined with AND to form the WHERE
clause (`list` [ `sqlalchemy.sql.ColumnElement` ]).
"""

order_by: List[sqlalchemy.sql.ColumnElement]
"""Columns to appear in ORDER BY clause (`list`
[`sqlalchemy.sql.ColumnElement` ])
"""

limit: Optional[Tuple[int, Optional[int]]]
"""Limit on the number of returned rows and optional offset (`tuple`)"""
24 changes: 3 additions & 21 deletions python/lsst/daf/butler/registry/queries/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

__all__ = ("QueryBuilder",)

import dataclasses
from typing import AbstractSet, Any, Iterable, List, Optional

import sqlalchemy.sql

from ...core import DatasetType, Dimension, DimensionElement, SimpleQuery, SkyPixDimension, ddl
from ...core import DatasetType, Dimension, DimensionElement, SimpleQuery, SkyPixDimension
from ...core.named import NamedKeyDict, NamedValueAbstractSet, NamedValueSet
from .._collectionType import CollectionType
from ..wildcards import CollectionQuery, CollectionSearch
Expand Down Expand Up @@ -570,16 +569,11 @@ def _order_by_columns(self) -> Iterable[OrderByColumn]:
for order_by_column in self.summary.order_by.order_by_columns:

column: sqlalchemy.sql.ColumnElement
field_spec: Optional[ddl.FieldSpec]
dimension: Optional[Dimension] = None
if order_by_column.column is None:
# dimension name, it has to be in SELECT list already, only
# add it to ORDER BY
assert isinstance(order_by_column.element, Dimension), "expecting full Dimension"
column = self._columns.getKeyColumn(order_by_column.element)
add_to_select = False
field_spec = None
dimension = order_by_column.element
else:
table = self._elements[order_by_column.element]

Expand All @@ -592,25 +586,13 @@ def _order_by_columns(self) -> Iterable[OrderByColumn]:
else:
column = timespan_repr.upper()
label = f"{order_by_column.element.name}_timespan_end"
field_spec = ddl.FieldSpec(label, dtype=sqlalchemy.BigInteger, nullable=True)
else:
column = table.columns[order_by_column.column]
# make a unique label for it
label = f"{order_by_column.element.name}_{order_by_column.column}"
field_spec = order_by_column.element.RecordClass.fields.facts[order_by_column.column]
field_spec = dataclasses.replace(field_spec, name=label)

column = column.label(label)
add_to_select = True

order_by_columns.append(
OrderByColumn(
column=column,
ordering=order_by_column.ordering,
add_to_select=add_to_select,
field_spec=field_spec,
dimension=dimension,
)
)

order_by_columns.append(OrderByColumn(column=column, ordering=order_by_column.ordering))

return order_by_columns
98 changes: 21 additions & 77 deletions python/lsst/daf/butler/registry/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,13 @@ class OrderByColumn:
ordering: bool
"""True for ascending order, False for descending (`bool`)."""

add_to_select: bool
"""True if columns is a non-key column and needs to be added to select
columns explicitly (`bool`)."""

field_spec: Optional[ddl.FieldSpec]
"""Field specification for a column in materialized table (`ddl.FieldSpec`)
"""

dimension: Optional[Dimension]
"""Not-None if column corresponds to a dimension (`Dimension` or `None`)"""

@property
def column_order(self) -> sqlalchemy.sql.ColumnElement:
"""Column element for use in ORDER BY clause
(`sqlalchemy.sql.ColumnElement`)
"""
return self.column.asc() if self.ordering else self.column.desc()

def materialized(self, table: sqlalchemy.schema.Table) -> OrderByColumn:
"""Re-purpose ordering column definition for a materialized table.
Parameters
----------
table : `sqlalchemy.schema.Table`
Materialized table, it should have all columns in SELECT clause
already.
Returns
-------
column : `OrderByColumn`
Column definition to use with ORDER BY in materialized table.
"""
return OrderByColumn(
column=table.columns[self.dimension.name if self.dimension else self.column.name],
ordering=self.ordering,
add_to_select=False,
field_spec=None,
dimension=self.dimension,
)


class Query(ABC):
"""An abstract base class for queries that return some combination of
Expand Down Expand Up @@ -857,20 +824,18 @@ def sql(self) -> sqlalchemy.sql.FromClause:
if datasetColumns is not None:
simpleQuery.columns.extend(datasetColumns)

assert not simpleQuery.order_by, "Input query cannot have ORDER BY"
if self._order_by_columns:
# add ORDER BY columns
select_columns = [column.column for column in self._order_by_columns if column.add_to_select]
simpleQuery.columns.extend(select_columns)
sql = simpleQuery.combine()
# add ORDER BY column
order_by_columns = [column.column_order for column in self._order_by_columns]
sql = sql.order_by(*order_by_columns)
else:
sql = simpleQuery.combine()
order_by_column = sqlalchemy.func.row_number().over(order_by=order_by_columns).label("_orderby")
simpleQuery.columns.append(order_by_column)
simpleQuery.order_by = [order_by_column]

assert simpleQuery.limit is None, "Input query cannot have LIMIT"
simpleQuery.limit = self._limit

if self._limit:
sql = sql.limit(self._limit[0])
if self._limit[1] is not None:
sql = sql.offset(self._limit[1])
sql = simpleQuery.combine()

if self._uniqueness is DirectQueryUniqueness.NEEDS_DISTINCT:
return sql.distinct()
Expand Down Expand Up @@ -911,10 +876,16 @@ def _makeTableSpec(self, constraints: bool = False) -> ddl.TableSpec:
self.managers.datasets.addDatasetForeignKey(spec, primaryKey=unique, constraint=constraints)
self.managers.collections.addRunForeignKey(spec, nullable=False, constraint=constraints)

# may need few extra columns from ORDER BY
spec.fields.update(
column.field_spec for column in self._order_by_columns if column.field_spec is not None
)
# Need a column for ORDER BY if ordering is requested
if self._order_by_columns:
spec.fields.add(
ddl.FieldSpec(
name="_orderby",
dtype=sqlalchemy.BigInteger,
nullable=False,
doc="Column to use with ORDER BY",
)
)

return spec

Expand All @@ -926,7 +897,6 @@ def materialize(self, db: Database) -> Iterator[Query]:
table = session.makeTemporaryTable(spec)
if not self._doomed_by:
db.insert(table, select=self.sql, names=spec.fields.names)
order_by_columns = [column.materialized(table) for column in self._order_by_columns]
yield MaterializedQuery(
table=table,
spatial=self.spatial,
Expand All @@ -936,7 +906,6 @@ def materialize(self, db: Database) -> Iterator[Query]:
whereRegion=self.whereRegion,
managers=self.managers,
doomed_by=self._doomed_by,
order_by_columns=order_by_columns,
)
session.dropTemporaryTable(table)

Expand Down Expand Up @@ -1010,9 +979,6 @@ class MaterializedQuery(Query):
A list of messages (appropriate for e.g. logging or exceptions) that
explain why the query is known to return no results even before it is
executed. Queries with a non-empty list will never be executed.
order_by : `Tuple` [ `str` ], optional
Optional list of column names to use in ORDER BY clause, names can be
prefixed with minus sign for descending ordering.
"""

def __init__(
Expand All @@ -1026,14 +992,12 @@ def __init__(
whereRegion: Optional[Region],
managers: RegistryManagers,
doomed_by: Iterable[str] = (),
order_by_columns: Iterable[OrderByColumn] = (),
):
super().__init__(graph=graph, whereRegion=whereRegion, managers=managers, doomed_by=doomed_by)
self._table = table
self._spatial = tuple(spatial)
self._datasetType = datasetType
self._isUnique = isUnique
self._order_by_columns = order_by_columns

def isUnique(self) -> bool:
# Docstring inherited from Query.
Expand All @@ -1048,17 +1012,6 @@ def spatial(self) -> Iterator[DimensionElement]:
# Docstring inherited from Query.
return iter(self._spatial)

def order_by(self, *args: str) -> Query:
# Docstring inherited from Query.
raise NotImplementedError("MaterializedQuery.order_by should not be called directly")

def limit(self, limit: int, offset: Optional[int] = None) -> Query:
# Docstring inherited from Query.

# Calling limit on materialized data is likely an error, limit should
# be set before materializing.
raise NotImplementedError("MaterializedQuery.limit should not be called directly")

def getRegionColumn(self, name: str) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited from Query.
return self._table.columns[f"{name}_region"]
Expand All @@ -1079,9 +1032,8 @@ def getDatasetColumns(self) -> Optional[DatasetQueryColumns]:
def sql(self) -> sqlalchemy.sql.FromClause:
# Docstring inherited from Query.
select = self._table.select()
if self._order_by_columns:
order_by_columns = [column.column_order for column in self._order_by_columns]
select = select.order_by(*order_by_columns)
if "_orderby" in self._table.columns:
select = select.order_by(self._table.columns["_orderby"])
return select

@contextmanager
Expand Down Expand Up @@ -1165,14 +1117,6 @@ def spatial(self) -> Iterator[DimensionElement]:
# Docstring inherited from Query.
return iter(())

def order_by(self, *args: str) -> Query:
# Docstring inherited from Query.
return self

def limit(self, limit: int, offset: Optional[int] = None) -> Query:
# Docstring inherited from Query.
return self

def getRegionColumn(self, name: str) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited from Query.
raise KeyError(f"No region for {name} in query (no regions at all, actually).")
Expand Down
28 changes: 11 additions & 17 deletions python/lsst/daf/butler/registry/queries/_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ def constrain(self, query: SimpleQuery, columns: Callable[[str], sqlalchemy.sql.
]
),
)
if self._order_by:
# If ordering was specified for this result set then we assume
# that the same ordering is needed for the query that we are
# constraining. Add the ordering column to SELECT and ORDER
# BY clauses.
order_column = fromClause.columns["_orderby"].label("_orderby")
query.columns.append(order_column)
query.order_by = [order_column]

def findDatasets(
self, datasetType: Union[DatasetType, str], collections: Any, *, findFirst: bool = True
Expand Down Expand Up @@ -1000,24 +1008,11 @@ class DatabaseDimensionRecordQueryResults(DimensionRecordQueryResults):
def __init__(self, dataIds: DataCoordinateQueryResults, recordStorage: DimensionRecordStorage):
self._dataIds = dataIds
self._recordStorage = recordStorage
self._order_by: Iterable[str] = ()

def __iter__(self) -> Iterator[DimensionRecord]:
# LIMIT is already applied at DataCoordinateQueryResults level
# (assumption here is that if DataId exists then dimension record
# exists too and their counts must be equal). We still need to make
# sure that ordering is applied to dimension records as well.
if not self._order_by:
return iter(self._recordStorage.fetch(self._dataIds))
else:
# fetch() method does not support ordering, for now do it hard way
# by fetching everything into memory and ordering by DataId
dataIds = self._dataIds.toSequence()
rec_map = {}
for rec in self._recordStorage.fetch(dataIds):
rec_map[rec.dataId] = rec
# TODO: Do we want to clean up dataIds that may be missing
return iter(rec_map[dataId] for dataId in dataIds)
# Both ordering and limit is handled by DataCoordinateQueryResults
# which propagates them to record storage.
return iter(self._recordStorage.fetch(self._dataIds))

def count(self, *, exact: bool = True) -> int:
# Docstring inherited from base class.
Expand All @@ -1030,7 +1025,6 @@ def any(self, *, execute: bool = True, exact: bool = True) -> bool:
def order_by(self, *args: str) -> DimensionRecordQueryResults:
# Docstring inherited from base class.
self._dataIds = self._dataIds.order_by(*args)
self._order_by = args
return self

def limit(self, limit: int, offset: Optional[int] = None) -> DimensionRecordQueryResults:
Expand Down

0 comments on commit 8d7bf43

Please sign in to comment.