Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31472: Implement batching of queries with long IN clause #562

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 37 additions & 11 deletions python/lsst/daf/butler/registry/opaque.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

__all__ = ["ByNameOpaqueTableStorage", "ByNameOpaqueTableStorageManager"]

import itertools
from typing import (
Any,
ClassVar,
Dict,
Iterable,
Iterator,
List,
Optional,
)

Expand Down Expand Up @@ -79,21 +81,45 @@ def insert(self, *data: dict) -> None:

def fetch(self, **where: Any) -> Iterator[dict]:
# Docstring inherited from OpaqueTableStorage.
sql = self._table.select()
if where:
clauses = []

def _batch_in_clause(column: sqlalchemy.schema.Column, values: Iterable[Any]
) -> Iterator[sqlalchemy.sql.expression.ClauseElement]:
"""Split one long IN clause into a series of shorter ones.
"""
in_limit = 1000
# We have to remove possible duplicates from values; and in many
# cases it should be helpful to order the items in the clause.
values = sorted(set(values))
for iposn in range(0, len(values), in_limit):
in_clause = column.in_(values[iposn:iposn + in_limit])
yield in_clause

def _batch_in_clauses(**where: Any) -> Iterator[sqlalchemy.sql.expression.ClauseElement]:
"""Generate a sequence of WHERE clauses with a limited number of
items in IN clauses.
"""
batches: List[Iterable[Any]] = []
for k, v in where.items():
column = self._table.columns[k]
if isinstance(v, (list, tuple, set)):
clause = column.in_(v)
batches.append(_batch_in_clause(column, v))
else:
clause = column == v
clauses.append(clause)
sql = sql.where(
sqlalchemy.sql.and_(*clauses)
)
for row in self._db.query(sql):
yield dict(row)
# single "batch" for a regular eq operator
batches.append([column == v])

for clauses in itertools.product(*batches):
yield sqlalchemy.sql.and_(*clauses)

sql = self._table.select()
if where:
# Split long IN clauses into shorter batches
for clause in _batch_in_clauses(**where):
sql_where = sql.where(clause)
for row in self._db.query(sql_where):
yield dict(row)
else:
for row in self._db.query(sql):
yield dict(row)

def delete(self, columns: Iterable[str], *rows: dict) -> None:
# Docstring inherited from OpaqueTableStorage.
Expand Down
13 changes: 13 additions & 0 deletions python/lsst/daf/butler/registry/tests/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ def testOpaque(self):
self.assertCountEqual(rows, list(registry.fetchOpaqueData(table)))
self.assertEqual(rows[0:1], list(registry.fetchOpaqueData(table, id=1)))
self.assertEqual(rows[1:2], list(registry.fetchOpaqueData(table, name="two")))
self.assertEqual(rows[0:1], list(registry.fetchOpaqueData(table, id=(1, 3), name=("one", "two"))))
self.assertEqual(rows, list(registry.fetchOpaqueData(table, id=(1, 2, 3))))
# Test very long IN clause which exceeds sqlite limit on number of
# parameters. SQLite says the limit is 32k but it looks like it is
# much higher.
self.assertEqual(rows, list(registry.fetchOpaqueData(table, id=list(range(300_000)))))
# Two IN clauses, each longer than 1k batch size, first with
# duplicates, second has matching elements in different batches (after
# sorting).
self.assertEqual(rows[0:2], list(registry.fetchOpaqueData(
table,
id=list(range(1000)) + list(range(100, 0, -1)),
name=["one"] + [f"q{i}" for i in range(2200)] + ["two"])))
self.assertEqual([], list(registry.fetchOpaqueData(table, id=1, name="two")))
registry.deleteOpaqueData(table, id=3)
self.assertCountEqual(rows[:2], list(registry.fetchOpaqueData(table)))
Expand Down