Skip to content

Commit

Permalink
Merge pull request #386 from zodb/issue381
Browse files Browse the repository at this point in the history
Fix #385 by eliminating the multiple result sets for lock_objects_and
  • Loading branch information
jamadden committed Nov 22, 2019
2 parents bc0bf50 + 8922284 commit 6734406
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ env:
script:
# coverage slows PyPy down from 2minutes to 12+.
- if [[ $TRAVIS_PYTHON_VERSION == 3.7 ]]; then pylint --rcfile=.pylintrc relstorage -f parseable -r n; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t check7 -t check2 -t BlobCache -t Switches --layer gevent; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=greenlet .travis/zope_testrunner_gevent.py -t checkBTreesLengthStress -t check7 -t check2 -t BlobCache -t Switches --layer gevent; fi
- if [[ $TRAVIS_PYTHON_VERSION == pypy* ]]; then python $RS_TEST_CMD --layer "!gevent"; fi
- if [[ $TRAVIS_PYTHON_VERSION != pypy* ]]; then coverage run -p --concurrency=thread $RS_TEST_CMD --layer "!gevent"; fi
# Make sure we can import without zope.schema, which is intended to
Expand Down
15 changes: 14 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@

- MySQL+gevent: Release the critical section a bit sooner. See :issue:`381`.

- SQLite+gevent: Fix possible deadlocks with gevent if switchers
- SQLite+gevent: Fix possible deadlocks with gevent if switches
occurred at unexpected times. See :issue:`382`.

- MySQL+gevent: Fix possible deadlocks with gevent if switches
occurred at unexpected times. See :issue:`385`. This also included
some minor optimizations.

.. caution::

This introduces a change in a stored procedure that is not
compatible with older versions of RelStorage. When this version
is first deployed, if there are older versions of RelStorage
still running, they will be unable to commit. They will fail with
a transient conflict error; they may attempt retries, but wil not
succeed. Read-only transactions will continue to work.

3.0.0 (2019-11-12)
==================

Expand Down
84 changes: 78 additions & 6 deletions src/relstorage/adapters/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,73 @@ class _NoGeventConnectionMixin(object):
GeventDriverMixin = _NoGeventDriverMixin
GeventConnectionMixin = _NoGeventConnectionMixin
else:
import select
from gevent.socket import wait
get_hub = gevent.get_hub

class GeventDriverMixin(object):
gevent = gevent

class GeventConnectionMixin(_NoGeventConnectionMixin):
"""
Helper for a connection that waits using gevent.
Subclasses must provide a ``fileno()`` method. The usual
pattern for executing a query would then be something like
this::
query = format_query_to_bytes(...)
self.gevent_wait_write()
self.send_query()
self.gevent_wait_read()
self.read_results()
It is important that ``send_query`` do nothing but put bytes
on the wire. It must not include any attempt to wait for a
response from the database, especially if that response could
take an arbitrary amount of time or block. (Of course, if
``send_query`` and ``read_results`` can arrange to use gevent
waiting functions too, you'll have finer control. This example
is all-or-nothing. Sometimes its easy to handle
``read_results`` in a looping function using a server-side
cursor.)
The ``gevent_wait_read`` and ``gevent_wait_write`` functions
are implemented using :func:`gevent.socket.wait`. That
function always takes a full iteration of the event loop to
determine whether a file descriptor is ready; it always yields
control to other greenlets immediately. gevent's own sockets
don't work that way; instead they try to read/write and catch
the resulting EAGAIN exception. Only after that do they yield
to the event loop. This is for good reason: eliminating
unnecessary switches can lead to higher throughput.
Here, a pass through the event loop can be risky. If we send a
request that establishes database locks that will require
further action from the greenlet to relinquish, those will
come into being (potentially blocking other greenlets in the
same or different processes) sometime between when
``send_query`` is entered and when ``gevent_wait_read`` exits.
If, for any reason, a different greenlet runs while we have
yielded to the event loop and blocks on a resource we own that
is not gevent cooperative (a non-monkey-patched lock, a
different database) we'll never regain control. And thus we'll
never be able to make forward progress and release those
locks. Since they're shared locks, that could harm arbitrary
machines in the cluster.
Thus, we perform a similar optimization as gevent sockets: we
first check to see if the file descriptor is ready and only
yield to the event loop if it isn't. The cost is an extra
system call to ``select``. For write requests, we could be
able to assume that they are always ready (depending on the
nature of the protocol); if that's so, override
:meth:`gevent_check_write`. The same goes for
:meth:`gevent_check_read`. This doesn't eliminate the problem,
but it should substantially reduce the chances of it
happening.
"""
gevent_sleep = staticmethod(gevent.sleep)

def close(self):
Expand All @@ -420,12 +480,24 @@ def __close_watchers(self):
self.gevent_write_watcher.close()
self.gevent_hub = None

def gevent_check_read(self,):
if select.select([self], (), (), 0)[0]:
return True
return False

def gevent_wait_read(self):
self.__check_watchers()
wait(self.gevent_read_watcher,
hub=self.gevent_hub)
if not self.gevent_check_read():
self.__check_watchers()
wait(self.gevent_read_watcher,
hub=self.gevent_hub)

def gevent_check_write(self):
if select.select((), [self], (), 0)[1]:
return True
return False

def gevent_wait_write(self):
self.__check_watchers()
wait(self.gevent_write_watcher,
hub=self.gevent_hub)
if not self.gevent_check_write():
self.__check_watchers()
wait(self.gevent_write_watcher,
hub=self.gevent_hub)
13 changes: 9 additions & 4 deletions src/relstorage/adapters/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,10 @@ def lock_objects_and_detect_conflicts(
class ReplicaClosedException(Exception):
"""The connection to the replica has been closed"""

class UnableToAcquireCommitLockError(StorageError):
class UnableToAcquireLockError(Exception):
"A lock cannot be acquired."

class UnableToAcquireCommitLockError(StorageError, UnableToAcquireLockError):
"""
The commit lock cannot be acquired due to a timeout.
Expand All @@ -1565,7 +1568,9 @@ class UnableToAcquireCommitLockError(StorageError):
However, for historical reasons, this exception is not a ``TransientError``.
"""

class UnableToLockRowsToModifyError(ConflictError):
# TransientError -> ConflictError -> ReadConflictError

class UnableToLockRowsToModifyError(ConflictError, UnableToAcquireLockError):
"""
We were unable to lock one or more rows that we intend to modify
due to a timeout.
Expand All @@ -1577,7 +1582,7 @@ class UnableToLockRowsToModifyError(ConflictError):
This is a type of ``ConflictError``, which is a transient error.
"""

class UnableToLockRowsToReadCurrentError(ReadConflictError):
class UnableToLockRowsToReadCurrentError(ReadConflictError, UnableToAcquireLockError):
"""
We were unable to lock one or more rows that belong to an object
that ``Connection.readCurrent()`` was called on.
Expand All @@ -1589,7 +1594,7 @@ class UnableToLockRowsToReadCurrentError(ReadConflictError):
This is a type of ``ReadConflictError``, which is a transient error.
"""

class UnableToAcquirePackUndoLockError(StorageError):
class UnableToAcquirePackUndoLockError(StorageError, UnableToAcquireLockError):
"""A pack or undo operation is in progress."""


Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/locker.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def reraise_commit_lock_error(self, cursor, lock_stmt, kind):
logger.debug("Failed to acquire commit lock:\n%s", debug_info)
message = "Acquiring a commit lock failed: %s%s" % (
sys.exc_info()[1],
'\n' + debug_info if debug_info else ''
'\n' + debug_info if debug_info else '(No debug info.)'
)
six.reraise(
kind,
Expand Down
41 changes: 11 additions & 30 deletions src/relstorage/adapters/mysql/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from ..dbiter import HistoryPreservingDatabaseIterator
from ..interfaces import IRelStorageAdapter
from ..interfaces import UnableToLockRowsToReadCurrentError
from ..interfaces import UnableToLockRowsToModifyError
from ..poller import Poller
from ..scriptrunner import ScriptRunner
from ..batch import RowBatcher
Expand Down Expand Up @@ -292,12 +293,12 @@ def _best_lock_objects_and_detect_conflicts(self, cursor, read_current_oids):
# doesn't matter.
read_current_param = json.dumps(list(read_current_oids.items()))

proc = 'lock_objects_and_detect_conflicts(%s)'
proc = 'lock_objects_and_detect_conflicts(%s, %s)'
try:
multi_results = self.driver.callproc_multi_result(
cursor,
proc,
(read_current_param,)
(read_current_param, 0)
)
except self.locker.lock_exceptions as e:
# On MySQL 5.7, the time-based mechanism to determine that
Expand All @@ -313,39 +314,19 @@ def _best_lock_objects_and_detect_conflicts(self, cursor, read_current_oids):
self._describe_best_lock_objects_and_detect_conflicts(),
UnableToLockRowsToReadCurrentError
)
elif 'exclusive locks' in str(e):
self.locker.reraise_commit_lock_error(
cursor,
self._describe_best_lock_objects_and_detect_conflicts(),
UnableToLockRowsToModifyError
)
raise

# There's always a useless last result, the result of the stored procedure itself.
proc_result = multi_results.pop()
assert not proc_result, proc_result

# With read_current_oids, the proc returns one or two results,
# either of which may be empty. If it returns one, that's
# because it detected a read conflict and aborted before
# trying to lock other rows. If it returns two, the first will
# always be empty because there was no read conflict.
#
# If we didn't have read_current_oids, it will only return a
# single result, the conflicts (which may be empty.)
# If these asserts fail, it's a good sign that we're running with an
# incompatible version of our stored procedures.

if read_current_oids:
if len(multi_results) == 1:
# We quit before we checked for conflicts. Must be
# a read conflict of length 1.
read_conflicts = multi_results[0]
assert len(read_conflicts) == 1, multi_results
assert read_conflicts[0][-1] is None, multi_results
conflicts = read_conflicts
else:
assert len(multi_results) == 2, multi_results
assert not multi_results[0], multi_results
conflicts = multi_results[1]
else:
# Only conflicts were checked and returned.
assert len(multi_results) == 1, multi_results
conflicts = multi_results[0]
assert len(multi_results) == 1, multi_results
conflicts = multi_results[0]

return conflicts

Expand Down
12 changes: 12 additions & 0 deletions src/relstorage/adapters/mysql/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ def callproc_multi_result(self, cursor, proc,
...
]
The last item in the list is the result of the stored procedure
itself.
Note that, because 'CALL' potentially returns multiple result
sets, there is potentially at least one extra database round
trip involved when we call ``cursor.nextset()``. If the
Expand Down Expand Up @@ -217,6 +220,15 @@ def callproc_multi_result(self, cursor, proc,
multi_results.append(cursor.fetchall())
return multi_results

def callproc_no_result(self, cursor, proc, args=()):
"""
An optimization for calling stored procedures that don't
produce any results, except for the null result of calling the
procedure itself.
"""
cursor.execute('CALL ' + proc, args)
cursor.fetchall()

def exit_critical_phase(self, connection, cursor):
"Override if you implement critical phases."

Expand Down

0 comments on commit 6734406

Please sign in to comment.