Skip to content

Commit

Permalink
Merge pull request #630 from lsst/tickets/DM-33164
Browse files Browse the repository at this point in the history
DM-33164: Reimplement `order_by` method for query results
  • Loading branch information
andy-slac committed Jan 13, 2022
2 parents 294c620 + 4df5dc9 commit 8f7140c
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 146 deletions.
20 changes: 19 additions & 1 deletion python/lsst/daf/butler/core/simpleQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

__all__ = ("SimpleQuery",)

from typing import Any, ClassVar, List, Optional, Type, TypeVar, Union
from typing import Any, ClassVar, List, Optional, Tuple, Type, TypeVar, Union

import sqlalchemy

Expand All @@ -39,6 +39,8 @@ class SimpleQuery:
def __init__(self) -> None:
self.columns = []
self.where = []
self.order_by = []
self.limit = None
self._from: Optional[sqlalchemy.sql.FromClause] = None

class Select:
Expand Down Expand Up @@ -126,6 +128,12 @@ def combine(self) -> sqlalchemy.sql.Select:
result = result.select_from(self._from)
if self.where:
result = result.where(sqlalchemy.sql.and_(*self.where))
if self.order_by:
result = result.order_by(*self.order_by)
if self.limit:
result = result.limit(self.limit[0])
if self.limit[1] is not None:
result = result.offset(self.limit[1])
return result

@property
Expand All @@ -152,6 +160,8 @@ def copy(self) -> SimpleQuery:
result = SimpleQuery()
result.columns = list(self.columns)
result.where = list(self.where)
result.order_by = list(self.order_by)
result.limit = self.limit
result._from = self._from
return result

Expand All @@ -164,3 +174,11 @@ def copy(self) -> SimpleQuery:
"""Boolean expressions that will be combined with AND to form the WHERE
clause (`list` [ `sqlalchemy.sql.ColumnElement` ]).
"""

order_by: List[sqlalchemy.sql.ColumnElement]
"""Columns to appear in ORDER BY clause (`list`
[`sqlalchemy.sql.ColumnElement` ])
"""

limit: Optional[Tuple[int, Optional[int]]]
"""Limit on the number of returned rows and optional offset (`tuple`)"""
24 changes: 3 additions & 21 deletions python/lsst/daf/butler/registry/queries/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

__all__ = ("QueryBuilder",)

import dataclasses
from typing import AbstractSet, Any, Iterable, List, Optional

import sqlalchemy.sql

from ...core import DatasetType, Dimension, DimensionElement, SimpleQuery, SkyPixDimension, ddl
from ...core import DatasetType, Dimension, DimensionElement, SimpleQuery, SkyPixDimension
from ...core.named import NamedKeyDict, NamedValueAbstractSet, NamedValueSet
from .._collectionType import CollectionType
from ..wildcards import CollectionQuery, CollectionSearch
Expand Down Expand Up @@ -570,16 +569,11 @@ def _order_by_columns(self) -> Iterable[OrderByColumn]:
for order_by_column in self.summary.order_by.order_by_columns:

column: sqlalchemy.sql.ColumnElement
field_spec: Optional[ddl.FieldSpec]
dimension: Optional[Dimension] = None
if order_by_column.column is None:
# dimension name, it has to be in SELECT list already, only
# add it to ORDER BY
assert isinstance(order_by_column.element, Dimension), "expecting full Dimension"
column = self._columns.getKeyColumn(order_by_column.element)
add_to_select = False
field_spec = None
dimension = order_by_column.element
else:
table = self._elements[order_by_column.element]

Expand All @@ -592,25 +586,13 @@ def _order_by_columns(self) -> Iterable[OrderByColumn]:
else:
column = timespan_repr.upper()
label = f"{order_by_column.element.name}_timespan_end"
field_spec = ddl.FieldSpec(label, dtype=sqlalchemy.BigInteger, nullable=True)
else:
column = table.columns[order_by_column.column]
# make a unique label for it
label = f"{order_by_column.element.name}_{order_by_column.column}"
field_spec = order_by_column.element.RecordClass.fields.facts[order_by_column.column]
field_spec = dataclasses.replace(field_spec, name=label)

column = column.label(label)
add_to_select = True

order_by_columns.append(
OrderByColumn(
column=column,
ordering=order_by_column.ordering,
add_to_select=add_to_select,
field_spec=field_spec,
dimension=dimension,
)
)

order_by_columns.append(OrderByColumn(column=column, ordering=order_by_column.ordering))

return order_by_columns
98 changes: 21 additions & 77 deletions python/lsst/daf/butler/registry/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,13 @@ class OrderByColumn:
ordering: bool
"""True for ascending order, False for descending (`bool`)."""

add_to_select: bool
"""True if columns is a non-key column and needs to be added to select
columns explicitly (`bool`)."""

field_spec: Optional[ddl.FieldSpec]
"""Field specification for a column in materialized table (`ddl.FieldSpec`)
"""

dimension: Optional[Dimension]
"""Not-None if column corresponds to a dimension (`Dimension` or `None`)"""

@property
def column_order(self) -> sqlalchemy.sql.ColumnElement:
"""Column element for use in ORDER BY clause
(`sqlalchemy.sql.ColumnElement`)
"""
return self.column.asc() if self.ordering else self.column.desc()

def materialized(self, table: sqlalchemy.schema.Table) -> OrderByColumn:
"""Re-purpose ordering column definition for a materialized table.
Parameters
----------
table : `sqlalchemy.schema.Table`
Materialized table, it should have all columns in SELECT clause
already.
Returns
-------
column : `OrderByColumn`
Column definition to use with ORDER BY in materialized table.
"""
return OrderByColumn(
column=table.columns[self.dimension.name if self.dimension else self.column.name],
ordering=self.ordering,
add_to_select=False,
field_spec=None,
dimension=self.dimension,
)


class Query(ABC):
"""An abstract base class for queries that return some combination of
Expand Down Expand Up @@ -857,20 +824,18 @@ def sql(self) -> sqlalchemy.sql.FromClause:
if datasetColumns is not None:
simpleQuery.columns.extend(datasetColumns)

assert not simpleQuery.order_by, "Input query cannot have ORDER BY"
if self._order_by_columns:
# add ORDER BY columns
select_columns = [column.column for column in self._order_by_columns if column.add_to_select]
simpleQuery.columns.extend(select_columns)
sql = simpleQuery.combine()
# add ORDER BY column
order_by_columns = [column.column_order for column in self._order_by_columns]
sql = sql.order_by(*order_by_columns)
else:
sql = simpleQuery.combine()
order_by_column = sqlalchemy.func.row_number().over(order_by=order_by_columns).label("_orderby")
simpleQuery.columns.append(order_by_column)
simpleQuery.order_by = [order_by_column]

assert simpleQuery.limit is None, "Input query cannot have LIMIT"
simpleQuery.limit = self._limit

if self._limit:
sql = sql.limit(self._limit[0])
if self._limit[1] is not None:
sql = sql.offset(self._limit[1])
sql = simpleQuery.combine()

if self._uniqueness is DirectQueryUniqueness.NEEDS_DISTINCT:
return sql.distinct()
Expand Down Expand Up @@ -911,10 +876,16 @@ def _makeTableSpec(self, constraints: bool = False) -> ddl.TableSpec:
self.managers.datasets.addDatasetForeignKey(spec, primaryKey=unique, constraint=constraints)
self.managers.collections.addRunForeignKey(spec, nullable=False, constraint=constraints)

# may need few extra columns from ORDER BY
spec.fields.update(
column.field_spec for column in self._order_by_columns if column.field_spec is not None
)
# Need a column for ORDER BY if ordering is requested
if self._order_by_columns:
spec.fields.add(
ddl.FieldSpec(
name="_orderby",
dtype=sqlalchemy.BigInteger,
nullable=False,
doc="Column to use with ORDER BY",
)
)

return spec

Expand All @@ -926,7 +897,6 @@ def materialize(self, db: Database) -> Iterator[Query]:
table = session.makeTemporaryTable(spec)
if not self._doomed_by:
db.insert(table, select=self.sql, names=spec.fields.names)
order_by_columns = [column.materialized(table) for column in self._order_by_columns]
yield MaterializedQuery(
table=table,
spatial=self.spatial,
Expand All @@ -936,7 +906,6 @@ def materialize(self, db: Database) -> Iterator[Query]:
whereRegion=self.whereRegion,
managers=self.managers,
doomed_by=self._doomed_by,
order_by_columns=order_by_columns,
)
session.dropTemporaryTable(table)

Expand Down Expand Up @@ -1010,9 +979,6 @@ class MaterializedQuery(Query):
A list of messages (appropriate for e.g. logging or exceptions) that
explain why the query is known to return no results even before it is
executed. Queries with a non-empty list will never be executed.
order_by : `Tuple` [ `str` ], optional
Optional list of column names to use in ORDER BY clause, names can be
prefixed with minus sign for descending ordering.
"""

def __init__(
Expand All @@ -1026,14 +992,12 @@ def __init__(
whereRegion: Optional[Region],
managers: RegistryManagers,
doomed_by: Iterable[str] = (),
order_by_columns: Iterable[OrderByColumn] = (),
):
super().__init__(graph=graph, whereRegion=whereRegion, managers=managers, doomed_by=doomed_by)
self._table = table
self._spatial = tuple(spatial)
self._datasetType = datasetType
self._isUnique = isUnique
self._order_by_columns = order_by_columns

def isUnique(self) -> bool:
# Docstring inherited from Query.
Expand All @@ -1048,17 +1012,6 @@ def spatial(self) -> Iterator[DimensionElement]:
# Docstring inherited from Query.
return iter(self._spatial)

def order_by(self, *args: str) -> Query:
# Docstring inherited from Query.
raise NotImplementedError("MaterializedQuery.order_by should not be called directly")

def limit(self, limit: int, offset: Optional[int] = None) -> Query:
# Docstring inherited from Query.

# Calling limit on materialized data is likely an error, limit should
# be set before materializing.
raise NotImplementedError("MaterializedQuery.limit should not be called directly")

def getRegionColumn(self, name: str) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited from Query.
return self._table.columns[f"{name}_region"]
Expand All @@ -1079,9 +1032,8 @@ def getDatasetColumns(self) -> Optional[DatasetQueryColumns]:
def sql(self) -> sqlalchemy.sql.FromClause:
# Docstring inherited from Query.
select = self._table.select()
if self._order_by_columns:
order_by_columns = [column.column_order for column in self._order_by_columns]
select = select.order_by(*order_by_columns)
if "_orderby" in self._table.columns:
select = select.order_by(self._table.columns["_orderby"])
return select

@contextmanager
Expand Down Expand Up @@ -1165,14 +1117,6 @@ def spatial(self) -> Iterator[DimensionElement]:
# Docstring inherited from Query.
return iter(())

def order_by(self, *args: str) -> Query:
# Docstring inherited from Query.
return self

def limit(self, limit: int, offset: Optional[int] = None) -> Query:
# Docstring inherited from Query.
return self

def getRegionColumn(self, name: str) -> sqlalchemy.sql.ColumnElement:
# Docstring inherited from Query.
raise KeyError(f"No region for {name} in query (no regions at all, actually).")
Expand Down

0 comments on commit 8f7140c

Please sign in to comment.