Skip to content

Commit

Permalink
Merge pull request #544 from lsst/tickets/DM-30124
Browse files Browse the repository at this point in the history
DM-30124: Use native timestamp comparison for ingest_date
  • Loading branch information
andy-slac committed Jul 5, 2021
2 parents 2b48f09 + 727959a commit 900adb8
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 23 deletions.
6 changes: 6 additions & 0 deletions doc/changes/DM-30124.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Fix handling of ingest_date timestamps.

Previously there was an inconsistency between ingest_date database-native UTC
handling and astropy Time used for time literals which resulted in 37 second
difference. This updates makes consistent use of database-native time
functions to resolve this issue.
8 changes: 4 additions & 4 deletions doc/lsst.daf.butler/queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ Here are few examples for checking containment in a time range:
.. code-block:: sql
-- using literals for both timestamp and time range
T'2020-01-01' IN (T'2019-01-01', '2020-01-01')
(T'2020-01-01', T'2020-02-01') NOT IN (T'2019-01-01', '2020-01-01')
T'2020-01-01' IN (T'2019-01-01', T'2020-01-01')
(T'2020-01-01', T'2020-02-01') NOT IN (T'2019-01-01', T'2020-01-01')
-- using identifiers for each timestamp in a time range
T'2020-01-01' IN (interval.begin, interval.end)
Expand All @@ -255,7 +255,7 @@ OVERLAPS operator
^^^^^^^^^^^^^^^^^

The ``OVERLAPS`` operator checks for overlapping time ranges or regions, its
argument have to have consistent types. Like with ``IN`` operator time ranges
arguments have to have consistent types. Like with ``IN`` operator time ranges
can be represented with a tuple of two timestamps (literals or identifiers) or
with a single identifier. Regions can only be used as identifiers.
``OVERLAPS`` syntax is similar to ``IN`` but it does not require parentheses
Expand All @@ -266,7 +266,7 @@ Few examples of the syntax:

.. code-block:: sql
(T'2020-01-01', T'2022-01-01') OVERLAPS (T'2019-01-01', '2021-01-01')
(T'2020-01-01', T'2022-01-01') OVERLAPS (T'2019-01-01', T'2021-01-01')
(interval.begin, interval.end) OVERLAPS interval_2
interval_1 OVERLAPS interval_2
Expand Down
129 changes: 110 additions & 19 deletions python/lsst/daf/butler/registry/queries/expressions/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from abc import ABC, abstractmethod
from datetime import datetime
import operator
from typing import (
Any,
Expand All @@ -42,10 +43,13 @@
TypeVar,
Union,
)
import warnings

from astropy.time import Time
import astropy.utils.exceptions
import sqlalchemy
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import func

from ....core import (
ddl,
Expand All @@ -60,6 +64,13 @@
from .parser import Node, TreeVisitor
from .categorize import categorizeElementId, categorizeConstant, ExpressionConstant

# As of astropy 4.2, the erfa interface is shipped independently and
# ErfaWarning is no longer an AstropyWarning
try:
import erfa
except ImportError:
erfa = None

if TYPE_CHECKING:
from .._structs import QueryColumns

Expand Down Expand Up @@ -115,22 +126,23 @@ class ExpressionTypeError(TypeError):


class _TimestampColumnElement(sqlalchemy.sql.ColumnElement):
"""Special ColumnElement type used for TIMESTAMP columns in expressions.
"""Special ColumnElement type used for TIMESTAMP columns or literals in
expressions.
TIMESTAMP columns in expressions are usually compared to time literals
which are `astropy.time.Time` instances that are converted to integer
nanoseconds since Epoch. For comparison we need to convert TIMESTAMP
column value to the same type. This type is a wrapper for actual column
that has special dialect-specific compilation methods defined below
transforming column in that common type.
SQLite stores timestamps as strings which sometimes can cause issues when
comparing strings. For more reliable comparison SQLite needs DATETIME()
wrapper for those strings. For PostgreSQL it works better if we add
TIMESTAMP to string literals.
This mechanism is only used for expressions in WHERE clause, values of the
TIMESTAMP columns returned from queries are still handled by standard
mechanism and they are converted to `datetime` instances.
"""
def __init__(self, column: sqlalchemy.sql.ColumnElement):
def __init__(self, column: Optional[sqlalchemy.sql.ColumnElement] = None,
literal: Optional[datetime] = None):
super().__init__()
self._column = column
self._literal = literal


@compiles(_TimestampColumnElement, "sqlite")
Expand All @@ -140,7 +152,11 @@ def compile_timestamp_sqlite(element: Any, compiler: Any, **kw: Mapping[str, Any
SQLite defines ``strftime`` function that can be used to convert timestamp
value to Unix seconds.
"""
return f"STRFTIME('%s', {element._column.name})*1000000000"
assert element._column is not None or element._literal is not None, "Must have column or literal"
if element._column is not None:
return compiler.process(func.datetime(element._column), **kw)
else:
return compiler.process(func.datetime(sqlalchemy.sql.literal(element._literal)), **kw)


@compiles(_TimestampColumnElement, "postgresql")
Expand All @@ -149,7 +165,12 @@ def compile_timestamp_pg(element: Any, compiler: Any, **kw: Mapping[str, Any]) -
PostgreSQL can use `EXTRACT(epoch FROM timestamp)` function.
"""
return f"EXTRACT(epoch FROM {element._column.name})*1000000000"
assert element._column is not None or element._literal is not None, "Must have column or literal"
if element._column is not None:
return compiler.process(element._column, **kw)
else:
literal = element._literal.isoformat(sep=" ", timespec="microseconds")
return "TIMESTAMP " + compiler.process(sqlalchemy.sql.literal(literal), **kw)


class WhereClauseConverter(ABC):
Expand Down Expand Up @@ -298,7 +319,10 @@ def fromLiteral(cls, value: Any) -> ScalarWhereClauseConverter:
Converter instance that wraps ``value``.
"""
dtype = type(value)
column = sqlalchemy.sql.literal(value, type_=ddl.AstropyTimeNsecTai if dtype is Time else None)
if dtype is datetime:
column = _TimestampColumnElement(literal=value)
else:
column = sqlalchemy.sql.literal(value, type_=ddl.AstropyTimeNsecTai if dtype is Time else None)
return cls(column, value, dtype)

def finish(self, node: Node) -> sqlalchemy.sql.ColumnElement:
Expand Down Expand Up @@ -528,6 +552,61 @@ def adapted(lhs: WhereClauseConverter, rhs: WhereClauseConverter) -> WhereClause
return adapted


class TimeBinaryOperator:

def __init__(self, operator: Callable, dtype: type):
self.operator = operator
self.dtype = dtype

def __call__(self, lhs: WhereClauseConverter, rhs: WhereClauseConverter) -> WhereClauseConverter:
assert isinstance(lhs, ScalarWhereClauseConverter)
assert isinstance(rhs, ScalarWhereClauseConverter)
operands = [arg.column for arg in self.coerceTimes(lhs, rhs)]
return ScalarWhereClauseConverter.fromExpression(self.operator(*operands), dtype=self.dtype)

@classmethod
def coerceTimes(cls, *args: ScalarWhereClauseConverter) -> List[ScalarWhereClauseConverter]:
"""Coerce one or more ScalarWhereClauseConverters to datetime type if
necessary.
If any of the arguments has `datetime` type then all other arguments
are converted to `datetime` type as well.
Parameters
----------
*args : `ScalarWhereClauseConverter`
Instances which represent time objects, their type can be one of
`Time` or `datetime`. If coercion happens, then `Time` objects can
only be literals, not expressions.
Returns
-------
converters : `list` [ `ScalarWhereClauseConverter` ]
List of converters in the same order as they appera in argument
list, some of them can be coerced to `datetime` type, non-coerced
arguments are returned without any change.
"""

def _coerce(arg: ScalarWhereClauseConverter) -> ScalarWhereClauseConverter:
"""Coerce single ScalarWhereClauseConverter to datetime literal.
"""
if arg.dtype is not datetime:
assert arg.value is not None, "Cannot coerce non-literals"
assert arg.dtype is Time, "Cannot coerce non-Time literals"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=astropy.utils.exceptions.AstropyWarning)
if erfa is not None:
warnings.simplefilter("ignore", category=erfa.ErfaWarning)
dt = arg.value.to_datetime()
arg = ScalarWhereClauseConverter.fromLiteral(dt)
return arg

if any(arg.dtype is datetime for arg in args):
return [_coerce(arg) for arg in args]
else:
return list(args)


class DispatchTable:
"""An object that manages unary- and binary-operator type-dispatch tables
for `WhereClauseConverter`.
Expand Down Expand Up @@ -721,17 +800,29 @@ def build(cls, TimespanReprClass: Type[TimespanDatabaseRepresentation]) -> Dispa
table.registerUnary("-", (int, float), operator.__neg__)
table.registerBinary("AND", bool, sqlalchemy.sql.and_)
table.registerBinary("OR", bool, sqlalchemy.sql.or_)
table.registerBinary("=", (int, float, str, Time), operator.__eq__, result=bool)
table.registerBinary("!=", (int, float, str, Time), operator.__ne__, result=bool)
table.registerBinary("<", (int, float, str, Time), operator.__lt__, result=bool)
table.registerBinary(">", (int, float, str, Time), operator.__gt__, result=bool)
table.registerBinary("<=", (int, float, str, Time), operator.__le__, result=bool)
table.registerBinary(">=", (int, float, str, Time), operator.__ge__, result=bool)
table.registerBinary("=", (int, float, str), operator.__eq__, result=bool)
table.registerBinary("!=", (int, float, str), operator.__ne__, result=bool)
table.registerBinary("<", (int, float, str), operator.__lt__, result=bool)
table.registerBinary(">", (int, float, str), operator.__gt__, result=bool)
table.registerBinary("<=", (int, float, str), operator.__le__, result=bool)
table.registerBinary(">=", (int, float, str), operator.__ge__, result=bool)
table.registerBinary("+", (int, float), operator.__add__)
table.registerBinary("-", (int, float), operator.__sub__)
table.registerBinary("*", (int, float), operator.__mul__)
table.registerBinary("/", (int, float), operator.__truediv__)
table.registerBinary("%", (int, float), operator.__mod__)
table.registerBinary("=", (Time, datetime), TimeBinaryOperator(operator.__eq__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary("!=", (Time, datetime), TimeBinaryOperator(operator.__ne__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary("<", (Time, datetime), TimeBinaryOperator(operator.__lt__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary(">", (Time, datetime), TimeBinaryOperator(operator.__gt__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary("<=", (Time, datetime), TimeBinaryOperator(operator.__le__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary(">=", (Time, datetime), TimeBinaryOperator(operator.__ge__, bool),
rhs=(Time, datetime), adapt=False)
table.registerBinary(
"=",
lhs=(int, float, str, Time, type(None)),
Expand Down Expand Up @@ -906,8 +997,8 @@ def visitIdentifier(self, name: str, node: Node) -> WhereClauseConverter:
assert self.columns.datasets is not None
assert self.columns.datasets.ingestDate is not None, "dataset.ingest_date is not in the query"
return ScalarWhereClauseConverter.fromExpression(
_TimestampColumnElement(self.columns.datasets.ingestDate),
Time,
_TimestampColumnElement(column=self.columns.datasets.ingestDate),
datetime,
)
elif constant is ExpressionConstant.NULL:
return ScalarWhereClauseConverter.fromLiteral(None)
Expand Down
31 changes: 31 additions & 0 deletions python/lsst/daf/butler/registry/tests/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta
import itertools
import logging
import os
Expand Down Expand Up @@ -1972,7 +1973,9 @@ def testIngestTimeQuery(self):

registry = self.makeRegistry()
self.loadData(registry, "base.yaml")
dt0 = datetime.utcnow()
self.loadData(registry, "datasets.yaml")
dt1 = datetime.utcnow()

datasets = list(registry.queryDatasets(..., collections=...))
len0 = len(datasets)
Expand All @@ -1989,6 +1992,34 @@ def testIngestTimeQuery(self):
len2 = len(datasets)
self.assertEqual(len2, 0)

# Check more exact timing to make sure there is no 37 seconds offset
# (after fixing DM-30124). SQLite time precision is 1 second, make
# sure that we don't test with higher precision.
tests = [
# format: (timestamp, operator, expected_len)
(dt0 - timedelta(seconds=1), ">", len0),
(dt0 - timedelta(seconds=1), "<", 0),
(dt1 + timedelta(seconds=1), "<", len0),
(dt1 + timedelta(seconds=1), ">", 0),
]
for dt, op, expect_len in tests:
dt_str = dt.isoformat(sep=" ")

where = f"ingest_date {op} T'{dt_str}'"
datasets = list(registry.queryDatasets(..., collections=..., where=where))
self.assertEqual(len(datasets), expect_len)

# same with bind using datetime or astropy Time
where = f"ingest_date {op} ingest_time"
datasets = list(registry.queryDatasets(..., collections=..., where=where,
bind={"ingest_time": dt}))
self.assertEqual(len(datasets), expect_len)

dt_astropy = astropy.time.Time(dt, format="datetime")
datasets = list(registry.queryDatasets(..., collections=..., where=where,
bind={"ingest_time": dt_astropy}))
self.assertEqual(len(datasets), expect_len)

def testTimespanQueries(self):
"""Test query expressions involving timespans.
"""
Expand Down

0 comments on commit 900adb8

Please sign in to comment.