Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-24355: make Database.sync (and methods that use it) transaction-friendly. #391

Merged
merged 4 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions python/lsst/daf/butler/registry/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,11 +1059,6 @@ def syncDimensionData(self, element: Union[DimensionElement, str],
ConflictingDefinitionError
Raised if the record exists in the database (according to primary
key lookup) but is inconsistent with the given one.
Notes
-----
This method cannot be called within transactions, as it needs to be
able to perform its own transaction to be concurrent.
"""
if conform:
if isinstance(element, str):
Expand Down
16 changes: 13 additions & 3 deletions python/lsst/daf/butler/registry/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import psycopg2
import sqlalchemy.dialects.postgresql

from ..interfaces import Database, ReadOnlyDatabaseError
from ..interfaces import Database
from ..nameShrinker import NameShrinker
from ...core import DatabaseTimespanRepresentation, ddl, Timespan, time_utils

Expand Down Expand Up @@ -144,8 +144,7 @@ def getTimespanRepresentation(cls) -> Type[DatabaseTimespanRepresentation]:
return _RangeTimespanRepresentation

def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
if not (self.isWriteable() or table.key in self._tempTables):
raise ReadOnlyDatabaseError(f"Attempt to replace into read-only database '{self}'.")
self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.")
if not rows:
return
# This uses special support for UPSERT in PostgreSQL backend:
Expand All @@ -161,6 +160,17 @@ def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
query = query.on_conflict_do_update(constraint=table.primary_key, set_=data)
self._connection.execute(query, *rows)

def ensure(self, table: sqlalchemy.schema.Table, *rows: dict) -> int:
# Docstring inherited.
self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.")
if not rows:
return 0
# Like `replace`, this uses UPSERT, but it's a bit simpler because
# we don't care which constraint is violated or specify which columns
# to update.
query = sqlalchemy.dialects.postgresql.dml.insert(table).on_conflict_do_nothing()
return self._connection.execute(query, *rows).rowcount


class _RangeTimespanType(sqlalchemy.TypeDecorator):
"""A single-column `Timespan` representation usable only with
Expand Down
38 changes: 32 additions & 6 deletions python/lsst/daf/butler/registry/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import sqlalchemy
import sqlalchemy.ext.compiler

from ..interfaces import Database, ReadOnlyDatabaseError, StaticTablesContext
from ..interfaces import Database, StaticTablesContext
from ...core import ddl


Expand Down Expand Up @@ -65,15 +65,16 @@ class _Replace(sqlalchemy.sql.Insert):
pass


# SQLite and PostgreSQL use similar syntax for their ON CONFLICT extension,
# but SQLAlchemy only knows about PostgreSQL's, so we have to compile some
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems odd that sqlite doesn't support it in sqlalchemy. Is it just that sqlalchemy team haven't got around to it or there are problems with it? Could we submit a patch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's slightly new functionality in SQLite, but not that new. I imagine a patch would be appreciated, but it'd be a while before I got around to it.

# custom text SQL ourselves.

# Hard to infer what types these should be from SQLAlchemy docs; just disable
# static typing by calling everything "Any".
@sqlalchemy.ext.compiler.compiles(_Replace, "sqlite")
def _replace(insert: Any, compiler: Any, **kwargs: Any) -> Any:
"""Generate an INSERT ... ON CONFLICT REPLACE query.
"""
# SQLite and PostgreSQL use similar syntax for their ON CONFLICT extension,
# but SQLAlchemy only knows about PostgreSQL's, so we have to compile some
# custom text SQL ourselves.
result = compiler.visit_insert(insert, **kwargs)
preparer = compiler.preparer
pk_columns = ", ".join([preparer.format_column(col) for col in insert.table.primary_key])
Expand All @@ -85,6 +86,21 @@ def _replace(insert: Any, compiler: Any, **kwargs: Any) -> Any:
return result


class _Ensure(sqlalchemy.sql.Insert):
"""A SQLAlchemy query that compiles to INSERT ... ON CONFLICT DO NOTHING.
"""
pass


@sqlalchemy.ext.compiler.compiles(_Ensure, "sqlite")
def _ensure(insert: Any, compiler: Any, **kwargs: Any) -> Any:
"""Generate an INSERT ... ON CONFLICT DO NOTHING query.
"""
result = compiler.visit_insert(insert, **kwargs)
result += " ON CONFLICT DO NOTHING"
return result


_AUTOINCR_TABLE_SPEC = ddl.TableSpec(
fields=[ddl.FieldSpec(name="id", dtype=sqlalchemy.Integer, primaryKey=True)]
)
Expand Down Expand Up @@ -349,6 +365,7 @@ def insert(self, table: sqlalchemy.schema.Table, *rows: dict, returnIds: bool =
select: Optional[sqlalchemy.sql.Select] = None,
names: Optional[Iterable[str]] = None,
) -> Optional[List[int]]:
self.assertTableWriteable(table, f"Cannot insert into read-only table {table}.")
autoincr = self._autoincr.get(table.name)
if autoincr is not None:
if select is not None:
Expand Down Expand Up @@ -407,8 +424,7 @@ def insert(self, table: sqlalchemy.schema.Table, *rows: dict, returnIds: bool =
return super().insert(table, *rows, select=select, names=names, returnIds=returnIds)

def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
if not (self.isWriteable() or table.key in self._tempTables):
raise ReadOnlyDatabaseError(f"Attempt to replace into read-only database '{self}'.")
self.assertTableWriteable(table, f"Cannot replace into read-only table {table}.")
if not rows:
return
if table.name in self._autoincr:
Expand All @@ -417,6 +433,16 @@ def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
)
self._connection.execute(_Replace(table), *rows)

def ensure(self, table: sqlalchemy.schema.Table, *rows: dict) -> int:
self.assertTableWriteable(table, f"Cannot ensure into read-only table {table}.")
if not rows:
return 0
if table.name in self._autoincr:
raise NotImplementedError(
"ensure does not support compound primary keys with autoincrement fields."
)
return self._connection.execute(_Ensure(table), *rows).rowcount

filename: Optional[str]
"""Name of the file this database is connected to (`str` or `None`).
Expand Down
21 changes: 3 additions & 18 deletions python/lsst/daf/butler/registry/dimensions/spatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,9 @@ def insert(self, *records: DimensionRecord) -> None:

def sync(self, record: DimensionRecord) -> bool:
# Docstring inherited from DimensionRecordStorage.sync.
inserted = super().sync(record)
if inserted:
try:
with self._db.transaction():
inserted = super().sync(record)
if inserted:
commonSkyPixRows = self._computeCommonSkyPixRows(record)
self._db.insert(self._commonSkyPixOverlapTable, *commonSkyPixRows)
except Exception as err:
# EEK. We've just failed to insert the overlap table rows
# after succesfully inserting the main dimension element table
# row, which means the database is now in a slightly
# inconsistent state.
# Note that we can't use transactions to solve this, because
# Database.sync needs to begin and commit its own transation;
# see also DM-24355.
raise RuntimeError(
f"Failed to add overlap records for {self.element} after "
f"successfully inserting the main row. This means the "
f"database is in an inconsistent state; please manually "
f"remove the row corresponding to data ID "
f"{record.dataId.byName()}."
) from err
return inserted
164 changes: 104 additions & 60 deletions python/lsst/daf/butler/registry/interfaces/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,37 @@ def _lockTables(self, tables: Iterable[sqlalchemy.schema.Table] = ()) -> None:
"""
raise NotImplementedError()

def isTableWriteable(self, table: sqlalchemy.schema.Table) -> bool:
"""Check whether a table is writeable, either because the database
connection is read-write or the table is a temporary table.
Parameters
----------
table : `sqlalchemy.schema.Table`
SQLAlchemy table object to check.
Returns
-------
writeable : `bool`
Whether this table is writeable.
"""
return self.isWriteable() or table.key in self._tempTables

def assertTableWriteable(self, table: sqlalchemy.schema.Table, msg: str) -> None:
"""Raise if the given table is not writeable, either because the
database connection is read-write or the table is a temporary table.
Parameters
----------
table : `sqlalchemy.schema.Table`
SQLAlchemy table object to check.
msg : `str`, optional
If provided, raise `ReadOnlyDatabaseError` instead of returning
`False`, with this message.
"""
if not self.isTableWriteable(table):
raise ReadOnlyDatabaseError(msg)

@contextmanager
def declareStaticTables(self, *, create: bool) -> Iterator[StaticTablesContext]:
"""Return a context manager in which the database's static DDL schema
Expand Down Expand Up @@ -1009,9 +1040,11 @@ def sync(self, table: sqlalchemy.schema.Table, *,
Notes
-----
This method may not be called within transactions. It may be called on
read-only databases if and only if the matching row does in fact
already exist.
May be used inside transaction contexts, so implementations may not
perform operations that interrupt transactions.
It may be called on read-only databases if and only if the matching row
does in fact already exist.
"""

def check() -> Tuple[int, Optional[List[str]], Optional[List]]:
Expand Down Expand Up @@ -1069,74 +1102,52 @@ def safeNotEqual(a: Any, b: Any) -> bool:
toReturn = None
return 1, inconsistencies, toReturn

if self.isWriteable() or table.key in self._tempTables:
# Database is writeable. Try an insert first, but allow it to fail
# (in only specific ways).
if self.isTableWriteable(table):
# Try an insert first, but allow it to fail (in only specific
# ways).
row = keys.copy()
if compared is not None:
row.update(compared)
if extra is not None:
row.update(extra)
insertSql = table.insert().values(row)
try:
with self.transaction(interrupting=True):
self._connection.execute(insertSql)
# Need to perform check() for this branch inside the
# transaction, so we roll back an insert that didn't do
# what we expected. That limits the extent to which we
# can reduce duplication between this block and the other
# ones that perform similar logic.
n, bad, result = check()
if n < 1:
raise RuntimeError("Insertion in sync did not seem to affect table. This is a bug.")
elif n > 1:
raise RuntimeError(f"Keys passed to sync {keys.keys()} do not comprise a "
f"unique constraint for table {table.name}.")
elif bad:
raise RuntimeError(
f"Conflict ({bad}) in sync after successful insert; this is "
f"possible if the same table is being updated by a concurrent "
f"process that isn't using sync, but it may also be a bug in "
f"daf_butler."
)
# No exceptions, so it looks like we inserted the requested row
# successfully.
inserted = True
except sqlalchemy.exc.IntegrityError as err:
# Most likely cause is that an equivalent row already exists,
# but it could also be some other constraint. Query for the
# row we think we matched to resolve that question.
with self.transaction(lock=[table]):
inserted = bool(self.ensure(table, row))
# Need to perform check() for this branch inside the
# transaction, so we roll back an insert that didn't do
# what we expected. That limits the extent to which we
# can reduce duplication between this block and the other
# ones that perform similar logic.
n, bad, result = check()
if n < 1:
# There was no matched row; insertion failed for some
# completely different reason. Just re-raise the original
# IntegrityError.
raise
elif n > 2:
# There were multiple matched rows, which means we
# conflicted *and* the arguments were bad to begin with.
raise RuntimeError(
"Necessary insertion in sync did not seem to affect table. This is a bug."
)
elif n > 1:
raise RuntimeError(f"Keys passed to sync {keys.keys()} do not comprise a "
f"unique constraint for table {table.name}.") from err
f"unique constraint for table {table.name}.")
elif bad:
# No logic bug, but data conflicted on the keys given.
raise DatabaseConflictError(f"Conflict in sync for table "
f"{table.name} on column(s) {bad}.") from err
# The desired row is already present and consistent with what
# we tried to insert.
inserted = False
if inserted:
raise RuntimeError(
f"Conflict ({bad}) in sync after successful insert; this is "
"possible if the same table is being updated by a concurrent "
"process that isn't using sync, but it may also be a bug in "
"daf_butler."
)
else:
raise DatabaseConflictError(
f"Conflict in sync for table {table.name} on column(s) {bad}."
)
else:
assert not self._connection.in_transaction(), (
"Calling sync within a transaction block is an error even "
"on a read-only database."
)
# Database is not writeable; just see if the row exists.
n, bad, result = check()
if n < 1:
raise ReadOnlyDatabaseError("sync needs to insert, but database is read-only.")
elif n > 1:
raise RuntimeError("Keys passed to sync do not comprise a unique constraint.")
elif bad:
raise DatabaseConflictError(f"Conflict in sync on column(s) {bad}.")
raise DatabaseConflictError(
f"Conflict in sync for table {table.name} on column(s) {bad}."
)
inserted = False
if returning is None:
return None, inserted
Expand Down Expand Up @@ -1194,8 +1205,7 @@ def insert(self, table: sqlalchemy.schema.Table, *rows: dict, returnIds: bool =
May be used inside transaction contexts, so implementations may not
perform operations that interrupt transactions.
"""
if not (self.isWriteable() or table.key in self._tempTables):
raise ReadOnlyDatabaseError(f"Attempt to insert into read-only database '{self}'.")
self.assertTableWriteable(table, f"Cannot insert into read-only table {table}.")
if select is not None and (rows or returnIds):
raise TypeError("'select' is incompatible with passing value rows or returnIds=True.")
if not rows and select is None:
Expand Down Expand Up @@ -1249,6 +1259,42 @@ def replace(self, table: sqlalchemy.schema.Table, *rows: dict) -> None:
"""
raise NotImplementedError()

@abstractmethod
def ensure(self, table: sqlalchemy.schema.Table, *rows: dict) -> int:
"""Insert one or more rows into a table, skipping any rows for which
insertion would violate any constraint.
Parameters
----------
table : `sqlalchemy.schema.Table`
Table rows should be inserted into.
*rows
Positional arguments are the rows to be inserted, as dictionaries
mapping column name to value. The keys in all dictionaries must
be the same.
Returns
-------
count : `int`
The number of rows actually inserted.
Raises
------
ReadOnlyDatabaseError
Raised if `isWriteable` returns `False` when this method is called.
This is raised even if the operation would do nothing even on a
writeable database.
Notes
-----
May be used inside transaction contexts, so implementations may not
perform operations that interrupt transactions.
Implementations are not required to support `ensure` on tables
with autoincrement keys.
"""
raise NotImplementedError()

def delete(self, table: sqlalchemy.schema.Table, columns: Iterable[str], *rows: dict) -> int:
"""Delete one or more rows from a table.
Expand Down Expand Up @@ -1283,8 +1329,7 @@ def delete(self, table: sqlalchemy.schema.Table, columns: Iterable[str], *rows:
The default implementation should be sufficient for most derived
classes.
"""
if not (self.isWriteable() or table.key in self._tempTables):
raise ReadOnlyDatabaseError(f"Attempt to delete from read-only database '{self}'.")
self.assertTableWriteable(table, f"Cannot delete from read-only table {table}.")
if columns and not rows:
# If there are no columns, this operation is supposed to delete
# everything (so we proceed as usual). But if there are columns,
Expand Down Expand Up @@ -1335,8 +1380,7 @@ def update(self, table: sqlalchemy.schema.Table, where: Dict[str, str], *rows: d
The default implementation should be sufficient for most derived
classes.
"""
if not (self.isWriteable() or table.key in self._tempTables):
raise ReadOnlyDatabaseError(f"Attempt to update read-only database '{self}'.")
self.assertTableWriteable(table, f"Cannot update read-only table {table}.")
if not rows:
return 0
sql = table.update().where(
Expand Down