Skip to content

Commit

Permalink
Fix deadlocks in MySQL new_oid and set_min_oid.
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 11, 2019
1 parent 040ccfd commit 4f6bf78
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 53 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
returned. Now, it frees the event loop after sending the request.
See :issue:`272`.

- Call ``set_min_oid`` less often if a storage is just updating
existing objects, not creating its own.

- Fix an occasional possible deadlock in MySQL's ``set_min_oid``.

3.0a4 (2019-07-10)
==================

Expand Down
3 changes: 0 additions & 3 deletions src/relstorage/adapters/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,6 @@ def list_changes(cursor, after_tid, last_tid):
class ISchemaInstaller(Interface):
"""Install the schema in the database, clear it, or uninstall it"""

def create(cursor):
"""Create the database tables, sequences, etc."""

def prepare():
"""
Create the database schema if it does not already exist.
Expand Down
5 changes: 3 additions & 2 deletions src/relstorage/adapters/mysql/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, options=None, **params):
)
self.connmanager.add_on_store_opened(self.locker.on_store_opened)
self.schema = MySQLSchemaInstaller(
driver=driver,
connmanager=self.connmanager,
runner=self.runner,
keep_history=self.keep_history,
Expand All @@ -120,7 +121,7 @@ def __init__(self, options=None, **params):
)
self.connmanager.add_on_store_opened(self.mover.on_store_opened)
self.connmanager.add_on_load_opened(self.mover.on_load_opened)
self.oidallocator = MySQLOIDAllocator()
self.oidallocator = MySQLOIDAllocator(driver)
self.txncontrol = MySQLTransactionControl(
connmanager=self.connmanager,
keep_history=self.keep_history,
Expand Down Expand Up @@ -185,7 +186,7 @@ def _prepare_get_latest_tid(self, cursor, restart=False):
cursor.execute(stmt)

def new_instance(self):
return MySQLAdapter(options=self.options, **self._params)
return type(self)(options=self.options, **self._params)

def __str__(self):
parts = [self.__class__.__name__]
Expand Down
19 changes: 19 additions & 0 deletions src/relstorage/adapters/mysql/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ def cursor(self, conn):
cursor.execute(self.MY_CHARSET_STMT)
return cursor

def callproc_multi_result(self, cursor, proc, args=()):
"""
Some drivers need extra arguments to execute a statement that
returns multiple results, and they don't all use the standard
way to retrieve them, so use this.
Returns a list of lists of rows: [
[[row in first], ...],
[[row in second], ...],
...
]
"""
cursor.execute('CALL ' + proc, args)

multi_results = [cursor.fetchall()]
while cursor.nextset():
multi_results.append(cursor.fetchall())
return multi_results


implement_db_driver_options(
__name__,
Expand Down
21 changes: 18 additions & 3 deletions src/relstorage/adapters/mysql/drivers/mysqlconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ class PyMySQLConnectorDriver(AbstractMySQLDriver):
_GEVENT_CAPABLE = True
_GEVENT_NEEDS_SOCKET_PATCH = True

if PYPY:
def __init__(self):
super(PyMySQLConnectorDriver, self).__init__()
def __init__(self):
super(PyMySQLConnectorDriver, self).__init__()
# conn.close() -> InternalError: Unread result found
# By the time we get to a close(), it's too late to do anything about it.
self.close_exceptions += (self.driver_module.InternalError,)
if PYPY:
# Patch to work around JIT bug found in (at least) 7.1.1
# https://bitbucket.org/pypy/pypy/issues/3014/jit-issue-inlining-structunpack-hh
try:
Expand Down Expand Up @@ -160,6 +163,18 @@ def set_autocommit(self, conn, value):
# This implementation uses a property instead of a method.
conn.autocommit = value

def callproc_multi_result(self, cursor, proc, args=()):
# This driver is weird, wants multi=True, returns an iterator of cursors
# instead of using nextset()
resultsets = cursor.execute("CALL " + proc, args, multi=True)
multi_results = []
for resultset in resultsets:
try:
multi_results.append(resultset.fetchall())
except self.driver_module.InterfaceError:
# This gets raised on the empty set at the end, for some reason.
break
return multi_results

class CMySQLConnectorDriver(PyMySQLConnectorDriver):
__name__ = 'C ' + _base_name
Expand Down
81 changes: 59 additions & 22 deletions src/relstorage/adapters/mysql/oidallocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MySQLOIDAllocator(AbstractOIDAllocator):
# After this many allocated OIDs should the (unlucky) thread that
# allocated the one evenly divisible by this number attempt to remove
# old OIDs.
garbage_collect_interval = 10001
garbage_collect_interval = 100001

# How many OIDs to attempt to delete at any one request. Keeping
# this on the small side relative to the interval limits the
Expand All @@ -49,12 +49,15 @@ class MySQLOIDAllocator(AbstractOIDAllocator):
# already has a custom timeout.
garbage_collect_batch_timeout = 5

def __init__(self):
_cached_next_n = 0

def __init__(self, driver):
"""
:param type disconnected_exception: The exception to raise when
we get an invalid value for ``lastrowid``.
"""
AbstractOIDAllocator.__init__(self)
self.driver = driver

# https://dev.mysql.com/doc/refman/5.7/en/example-auto-increment.html
# "Updating an existing AUTO_INCREMENT column value in an InnoDB
Expand Down Expand Up @@ -88,16 +91,35 @@ def __init__(self):
# true even for MyISAM and InnoDB, which normally do not reuse
# sequence values."
def set_min_oid(self, cursor, oid):
"""Ensure the next OID is at least the given OID."""
n = (oid + 15) // 16
# If the table is empty, MAX(zoid) returns NULL, which
# of course fails the comparison and so nothing gets inserted.
stmt = """
INSERT INTO new_oid (zoid)
SELECT %s
WHERE %s > (SELECT COALESCE(MAX(zoid), 0) FROM new_oid)
"""
cursor.execute(stmt, (n, n))
Ensure the next OID is at least the given OID.
"""
# A simple statement like the following can easily deadlock:
#
# INSERT INTO new_oid (zoid)
# SELECT %s
# WHERE %s > (SELECT COALESCE(MAX(zoid), 0) FROM new_oid)
#
#
# Session A: new_oid() -> Lock 1
# Session B: new_oid() -> Lock 2 (row 1 is invisible)
# Session A: new_oid() -> Lock 3 (row 2 is invisible)
# Session B: set_min_oid(2) -> Hang waiting for lock
# Session A: new_oid() -> Lock 4: Deadlock, Session B rolls back.
#
# Partly this is because MAX() is local to the current session.
# We deal with this by using a stored procedure to efficiently make
# multiple queries.

n = (oid + 15) // 16
multi_results = self.driver.callproc_multi_result(cursor, 'set_min_oid(%s)', (n,))
next_n, = multi_results[0][0]

# A side effect of checking may be allocating from the sequence.
# If it increased, we have a value we can use, because no one else can
# use that same value.
if next_n and next_n > n:
self._cached_next_n = max(next_n, self._cached_next_n)

@metricmethod
def new_oids(self, cursor):
Expand All @@ -123,15 +145,19 @@ def new_oids(self, cursor):
#
# Our solution is to just let rows build up if the delete fails. Eventually
# a GC, which happens at startup, will occur and hopefully get most of them.
stmt = "INSERT INTO new_oid VALUES ()"
cursor.execute(stmt)

# This is a DB-API extension. Fortunately, all
# supported drivers implement it. (In the past we used
# cursor.connection.insert_id(), which was specific to MySQLdb
# and PyMySQL.)
# 'SELECT LAST_INSERT_ID()' is the pure-SQL way to do this.
n = cursor.lastrowid
if self._cached_next_n:
n = self._cached_next_n
self._cached_next_n = 0
else:
stmt = "INSERT INTO new_oid VALUES ()"
cursor.execute(stmt)

# This is a DB-API extension. Fortunately, all
# supported drivers implement it. (In the past we used
# cursor.connection.insert_id(), which was specific to MySQLdb
# and PyMySQL.)
# 'SELECT LAST_INSERT_ID()' is the pure-SQL way to do this.
n = cursor.lastrowid

if n % self.garbage_collect_interval == 0:
self.garbage_collect_oids(cursor, n)
Expand Down Expand Up @@ -166,8 +192,19 @@ def garbage_collect_oids(self, cursor, max_value=None):
try:
cursor.execute(stmt, params)
except Exception: # pylint:disable=broad-except
# Luckily, a deadlock only rolls back the previous statement, not the
# whole transaction.
# Luckily, a deadlock only rolls back the previous
# statement, not the whole transaction.
#
# XXX: No, that's not true. A general error, like
# a lock timeout, will roll back the previous
# statement. A deadlock rolls back the whole
# transaction. We're lucky the difference here
# doesn't make any difference: we don't actually
# write anything to the database temp tables, etc,
# until the storage enters commit(), at which
# point we shouldn't need to allocate any more new
# oids.
#
# TODO: We'd prefer to only do this for errcode 1213: Deadlock.
# MySQLdb raises this as an OperationalError; what do all the other
# drivers do?
Expand Down
82 changes: 79 additions & 3 deletions src/relstorage/adapters/mysql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,61 @@

logger = __import__('logging').getLogger(__name__)

# Procedure to set the minimum next OID in one trip to the server.
# This returns a result, so we'd like to use a function (which we'd access
# with `SELECT set_min_oid(...)`), but the MySQL manual warns that functions take
# out table locks and limit concurrency, plus have some replication issues,
# so we don't do that. Instead we have a procedure. The procedure should be called
# using `cursor.execute('CALL set_min_oid(%s)` instead of cursor.callproc()
# --- callproc() involves setting server variables in an extra trip to the server.
# That would enable us to use an OUT param for the result, but getting that would be another
# round trip too. Don't forget to use nextset() to get the result of the procedure itself
# after getting the new oid.
_SET_MIN_OID = """
CREATE PROCEDURE set_min_oid(min_oid BIGINT)
BEGIN
-- In order to avoid deadlocks, we only do this if
-- the number we want to insert is strictly greater than
-- what the current sequence value is. If we use a value less
-- than that, there's a chance a different session has already allocated
-- and inserted that value into the table, meaning its locked.
-- We obviously cannot JUST use MAX(zoid) to find this value, we can't see
-- what other sessions have done. But if that's already >= to the min_oid,
-- then we don't have to do anything.
DECLARE next_oid BIGINT;
SELECT COALESCE(MAX(ZOID), 0)
INTO next_oid
FROM new_oid;
IF next_oid < min_oid THEN
-- Can't say for sure. Just because we can only see values
-- less doesn't mean they're not there in another transaction.
-- This will never block.
INSERT INTO new_oid VALUES ();
SELECT LAST_INSERT_ID()
INTO next_oid;
IF min_oid > next_oid THEN
-- This is unlikely to block. We just confirmed that the
-- sequence value is strictly less than this, so no one else
-- should be doing this.
INSERT IGNORE INTO new_oid (zoid)
VALUES (min_oid);
SET next_oid = min_oid;
END IF;
ELSE
-- Return a NULL value to signal that this value cannot
-- be cached and used because we didn't allocate it.
SET next_oid = NULL;
END IF;
SELECT next_oid;
END;
"""

@implementer(ISchemaInstaller)
class MySQLSchemaInstaller(AbstractSchemaInstaller):

Expand All @@ -46,11 +101,24 @@ class MySQLSchemaInstaller(AbstractSchemaInstaller):
'pack_state_tid',
)

procedures = {
'set_min_oid': _SET_MIN_OID,
}

def __init__(self, driver=None, **kwargs):
self.driver = driver
super(MySQLSchemaInstaller, self).__init__(**kwargs)

def get_database_name(self, cursor):
cursor.execute("SELECT DATABASE()")
for (name,) in cursor:
return self._metadata_to_native_str(name)

def list_procedures(self, cursor):
cursor.execute("SHOW PROCEDURE STATUS WHERE db = database()")
native = self._metadata_to_native_str
return [native(row['name']) for row in self._rows_as_dicts(cursor)]

def list_tables(self, cursor):
return list(self.__list_tables_and_engines(cursor))

Expand Down Expand Up @@ -117,7 +185,7 @@ def _create_temp_undo(self, _cursor):

def _reset_oid(self, cursor):
from .oidallocator import MySQLOIDAllocator
MySQLOIDAllocator().reset_oid(cursor)
MySQLOIDAllocator(self.driver).reset_oid(cursor)

def __convert_all_tables_to_innodb(self, cursor):
tables = self.__list_tables_not_innodb(cursor)
Expand All @@ -131,7 +199,15 @@ def _prepare_with_connection(self, conn, cursor):
from .oidallocator import MySQLOIDAllocator
self.__convert_all_tables_to_innodb(cursor)
super(MySQLSchemaInstaller, self)._prepare_with_connection(conn, cursor)
MySQLOIDAllocator().garbage_collect_oids(cursor)
MySQLOIDAllocator(self.driver).garbage_collect_oids(cursor)

def create_procedures(self, cursor):
# TODO: Handle updates when we change the text.
installed = self.list_procedures(cursor)
for name, create_stmt in self.procedures.items():
__traceback_info__ = name
if name not in installed:
cursor.execute(create_stmt)

# We can't TRUNCATE tables that have foreign-key relationships
# with other tables, but we can drop them. This has to be followed up by
Expand All @@ -141,7 +217,7 @@ def _prepare_with_connection(self, conn, cursor):
def _after_zap_all_tables(self, cursor, slow=False):
if not slow:
logger.debug("Creating tables after drop")
self.create(cursor)
self.create_tables(cursor)
logger.debug("Done creating tables after drop")
else:
super(MySQLSchemaInstaller, self)._after_zap_all_tables(cursor, slow)
4 changes: 3 additions & 1 deletion src/relstorage/adapters/oracle/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def prepare(self):
def callback(_conn, cursor):
tables = self.list_tables(cursor)
if 'object_state' not in tables:
self.create(cursor)
self.create_tables(cursor)
else:
self.check_compatibility(cursor, tables)
self.update_schema(cursor, tables)
Expand Down Expand Up @@ -216,6 +216,8 @@ def list_packages(self, cursor):
res[name.lower()] = version
return res

list_procedures = list_packages

def _create_pack_lock(self, cursor):
stmt = "CREATE TABLE pack_lock (dummy CHAR);"
self.runner.run_script(cursor, stmt)
Expand Down
3 changes: 3 additions & 0 deletions src/relstorage/adapters/postgresql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def list_procedures(self, cursor):
cursor.execute(stmt)
res = {}
for (name, text) in cursor.fetchall():
name = self._metadata_to_native_str(name)
text = self._metadata_to_native_str(text)

version = None
match = re.search(r'Version:\s*([0-9a-zA-Z.]+)', text)
if match is not None:
Expand Down
Loading

0 comments on commit 4f6bf78

Please sign in to comment.