Skip to content

Commit

Permalink
Merge pull request #506 from lsst/tickets/DM-29562
Browse files Browse the repository at this point in the history
DM-29562: Rewrite dataset subquery logic to make it easier on query planners.
  • Loading branch information
TallJimbo committed Apr 12, 2021
2 parents d8d4588 + abcb06c commit 0c8a5d4
Showing 1 changed file with 105 additions and 44 deletions.
149 changes: 105 additions & 44 deletions python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ByDimensionsDatasetRecordStorage(DatasetRecordStorage):
Instances of this class should never be constructed directly; use
`DatasetRecordStorageManager.register` instead.
"""

def __init__(self, *, datasetType: DatasetType,
db: Database,
dataset_type_id: int,
Expand All @@ -73,7 +74,11 @@ def find(self, collection: CollectionRecord, dataId: DataCoordinate,
raise TypeError(f"Cannot search for dataset in CALIBRATION collection {collection.name} "
f"without an input timespan.")
sql = self.select(collection=collection, dataId=dataId, id=SimpleQuery.Select,
run=SimpleQuery.Select, timespan=timespan).combine()
run=SimpleQuery.Select, timespan=timespan)
if sql is None:
return None
else:
sql = sql.combine()
results = self._db.query(sql)
row = results.fetchone()
if row is None:
Expand Down Expand Up @@ -301,52 +306,53 @@ def select(self, collection: CollectionRecord,
run: SimpleQuery.Select.Or[None] = SimpleQuery.Select,
timespan: SimpleQuery.Select.Or[Optional[Timespan]] = SimpleQuery.Select,
ingestDate: SimpleQuery.Select.Or[Optional[Timespan]] = None,
) -> SimpleQuery:
) -> Optional[SimpleQuery]:
# Docstring inherited from DatasetRecordStorage.
assert collection.type is not CollectionType.CHAINED
#
# There are two tables in play here:
#
# - the static dataset table (with the dataset ID, dataset type ID,
# run ID/name, and ingest date);
#
# - the dynamic tags/calibs table (with the dataset ID, dataset type
# type ID, collection ID/name, data ID, and possibly validity
# range).
#
# That means that we might want to return a query against either table
# or a JOIN of both, depending on which quantities the caller wants.
# But this method is documented/typed such that ``dataId`` is never
# `None` - i.e. we always constrain or retreive the data ID. That
# means we'll always include the tags/calibs table and join in the
# static dataset table only if we need things from it that we can't get
# from the tags/calibs table.
#
# Note that it's important that we include a WHERE constraint on both
# tables for any column (e.g. dataset_type_id) that is in both when
# it's given explicitly; not doing can prevent the query planner from
# using very important indexes. At present, we don't include those
# redundant columns in the JOIN ON expression, however, because the
# FOREIGN KEY (and its index) are defined only on dataset_id.
#
# We'll start with an empty SimpleQuery, and accumulate kwargs to pass
# to its `join` method when we bring in the tags/calibs table.
query = SimpleQuery()
# We always include the _static.dataset table, and we can always get
# the id and run fields from that; passing them as kwargs here tells
# SimpleQuery to handle them whether they're constraints or results.
# We always constraint the dataset_type_id here as well.
static_kwargs = {self._runKeyColumn: run}
if ingestDate is not None:
static_kwargs["ingest_date"] = SimpleQuery.Select
query.join(
self._static.dataset,
id=id,
dataset_type_id=self._dataset_type_id,
**static_kwargs
)
# If and only if the collection is a RUN, we constrain it in the static
# table (and also the tags or calibs table below)
if collection.type is CollectionType.RUN:
query.where.append(self._static.dataset.columns[self._runKeyColumn]
== collection.key)
# We get or constrain the data ID from the tags/calibs table, but
# We get the data ID or constrain it in the tags/calibs table, but
# that's multiple columns, not one, so we need to transform the one
# Select.Or argument into a dictionary of them.
kwargs: Dict[str, Any]
if dataId is SimpleQuery.Select:
kwargs = {dim.name: SimpleQuery.Select for dim in self.datasetType.dimensions.required}
else:
kwargs = dict(dataId.byName())
# We always constrain (never retrieve) the collection from the tags
# table.
# We always constrain (never retrieve) the collection in the
# tags/calibs table.
kwargs[self._collections.getCollectionForeignKeyName()] = collection.key
# constrain ingest time
if isinstance(ingestDate, Timespan):
# Tmespan is astropy Time (usually in TAI) and ingest_date is
# TIMESTAMP, convert values to Python datetime for sqlalchemy.
if ingestDate.isEmpty():
raise RuntimeError("Empty timespan constraint provided for ingest_date.")
if ingestDate.begin is not None:
begin = ingestDate.begin.utc.datetime # type: ignore
query.where.append(self._static.dataset.ingest_date >= begin)
if ingestDate.end is not None:
end = ingestDate.end.utc.datetime # type: ignore
query.where.append(self._static.dataset.ingest_date < end)
# And now we finally join in the tags or calibs table.
# We always constrain (never retrieve) the dataset type in at least the
# tags/calibs table.
kwargs["dataset_type_id"] = self._dataset_type_id
# Join in the tags or calibs table, turning those 'kwargs' entries into
# WHERE constraints or SELECT columns as appropriate.
if collection.type is CollectionType.CALIBRATION:
assert self._calibs is not None, \
"DatasetTypes with isCalibration() == False can never be found in a CALIBRATION collection."
Expand All @@ -361,17 +367,72 @@ def select(self, collection: CollectionRecord,
TimespanReprClass.fromLiteral(timespan)
)
)
query.join(
self._calibs,
onclause=(self._static.dataset.columns.id == self._calibs.columns.dataset_id),
**kwargs
)
query.join(self._calibs, **kwargs)
dataset_id_col = self._calibs.columns.dataset_id
else:
query.join(self._tags, **kwargs)
dataset_id_col = self._tags.columns.dataset_id
# We can always get the dataset_id from the tags/calibs table or
# constrain it there. Can't use kwargs for that because we need to
# alias it to 'id'.
if id is SimpleQuery.Select:
query.columns.append(dataset_id_col.label("id"))
elif id is not None:
query.where.append(dataset_id_col == id)
# It's possible we now have everything we need, from just the
# tags/calibs table. The things we might need to get from the static
# dataset table are the run key and the ingest date.
need_static_table = False
static_kwargs = {}
if run is not None:
if collection.type is CollectionType.RUN:
if run is SimpleQuery.Select:
# If the collection we're searching is a RUN, we know that
# if we find the dataset in that collection, then that's
# the datasets's run; we don't need to query for it.
query.columns.append(sqlalchemy.sql.literal(collection.key).label(self._runKeyColumn))
elif run != collection.name:
# This [sub]query is doomed to yield no results; dataset
# cannot be in more than one run.
return None
else:
query.where.append(self._static.dataset.columns[self._runKeyColumn] == collection.key)
else:
static_kwargs[self._runKeyColumn] = (
SimpleQuery.Select if run is SimpleQuery.Select else self._collections.find(run).key
)
need_static_table = True
# Ingest date can only come from the static table.
if ingestDate is not None:
need_static_table = True
if ingestDate is SimpleQuery.Select:
static_kwargs["ingest_date"] = SimpleQuery.Select
else:
assert isinstance(ingestDate, Timespan)
# Timespan is astropy Time (usually in TAI) and ingest_date is
# TIMESTAMP, convert values to Python datetime for sqlalchemy.
if ingestDate.isEmpty():
raise RuntimeError("Empty timespan constraint provided for ingest_date.")
if ingestDate.begin is not None:
begin = ingestDate.begin.utc.datetime # type: ignore
query.where.append(self._static.dataset.columns.ingest_date >= begin)
if ingestDate.end is not None:
end = ingestDate.end.utc.datetime # type: ignore
query.where.append(self._static.dataset.columns.ingest_date < end)
# If we need the static table, join it in via dataset_id and
# dataset_type_id
if need_static_table:
query.join(
self._tags,
onclause=(self._static.dataset.columns.id == self._tags.columns.dataset_id),
**kwargs
self._static.dataset,
onclause=(dataset_id_col == self._static.dataset.columns.id),
**static_kwargs,
)
# Also constrain dataset_type_id in static table in case that helps
# generate a better plan.
# We could also include this in the JOIN ON clause, but my guess is
# that that's a good idea IFF it's in the foreign key, and right
# now it isn't.
query.where.append(self._static.dataset.columns.dataset_type_id == self._dataset_type_id)
return query

def getDataId(self, id: DatasetId) -> DataCoordinate:
Expand Down

0 comments on commit 0c8a5d4

Please sign in to comment.