Skip to content

Commit

Permalink
Merge pull request #455 from zodb/issue454
Browse files Browse the repository at this point in the history
Fix write replica selection after a change.
  • Loading branch information
jamadden committed Apr 16, 2021
2 parents 6619b0a + d2c1205 commit b0553d8
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 28 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Expand Up @@ -5,7 +5,8 @@
3.4.2 (unreleased)
==================

- Nothing changed yet.
- Fix write replica selection after a disconnect, and generally
further improve handling of unexpectedly closed store connections.


3.4.1 (2021-04-12)
Expand Down
53 changes: 33 additions & 20 deletions src/relstorage/adapters/connections.py
Expand Up @@ -53,6 +53,8 @@ def __init__(self, connmanager):
self._new_connection = getattr(connmanager, self._NEW_CONNECTION_NAME)
self._restart = getattr(connmanager, self._RESTART_NAME)
self._rollback = getattr(connmanager, self._ROLLBACK_NAME)
# TODO: connmanager has this private, promote to public.
self._closed_exceptions = connmanager._ignored_exceptions

# Hook functions
on_opened = staticmethod(lambda conn, cursor: None)
Expand Down Expand Up @@ -131,13 +133,18 @@ def rollback_quietly(self):
cur = self._cursor
self.__dict__.pop('cursor', None) # force on_first_use to be called.
clean_rollback = self._rollback(conn, cur)
assert clean_rollback is not None
if not clean_rollback:
self.drop()
assert self.connection is None and self._cursor is None

self.on_rolledback(self.connection, self._cursor)
return clean_rollback

def open_if_needed(self):
# XXX: This bypasses putting _cursor into self.__dict__
# as cursor, so that this object appears true.
# That's a bit surprising. Should it be this way?
if self.connection is None or self._cursor is None:
self.drop()
self._open_connection()
Expand All @@ -161,25 +168,25 @@ def _restart_connection(self):
def restart(self):
"""
Restart the connection if there is any chance that it has any associated state.
"""
if not self:
assert not self.active, self.__dict__
if self.connection is not None: # But we have no cursor We
# do this so that if the connection has closed itself
# (or been closed by a unit test) we can detect that
# and restart automatically. We only actually do
# anything there for store connections.
self._restart_connection()
return

# If we've never accessed the cursor, we shouldn't have any
# state to restart.
if not self.active and 'cursor' not in self.__dict__:
return

self.active = False
self.restart_and_call(self.__noop)
If the connection has been disconnected, this may drop it.
"""
try:
if not self:
assert not self.active, self.__dict__
if self.connection is not None: # But we have no cursor
# We do this so that if the connection has closed
# itself (or been closed by a unit test) we can detect
# that and restart automatically. We only actually do
# anything there for store connections.
self._restart_connection()
return

self.active = False
self.restart_and_call(self.__noop)
except self._closed_exceptions:
# Uh-oh, the thing we wanted went away.
self.drop()

def restart_and_call(self, f, *args, **kw):
"""
Expand Down Expand Up @@ -239,7 +246,7 @@ def call(self, f, can_reconnect, *args, **kwargs):

try:
return f(self.connection, self._cursor, fresh_connection, *args, **kwargs)
except self.connmanager.driver.disconnected_exceptions as e:
except self._closed_exceptions as e:
# XXX: This feels like it's factored wrong.
if not can_reconnect:
raise
Expand Down Expand Up @@ -303,7 +310,13 @@ def begin(self):
self.connmanager.begin(*self.open_if_needed())

def _restart_connection(self):
self.rollback_quietly()
if self.rollback_quietly():
# If we failed to rollback, we dropped the connection,
# so there's no restarting.
self.connmanager.restart_store(self.connection,
self._cursor,
needs_rollback=False)


class PrePackConnection(StoreConnection):
__slots__ = ()
Expand Down
1 change: 1 addition & 0 deletions src/relstorage/adapters/connmanager.py
Expand Up @@ -86,6 +86,7 @@ class AbstractConnectionManager(object):
# take this as the union of the driver's close exceptions and disconnected
# exceptions (drivers aren't required to organize them to overlap, but
# in practice they should.)
# TODO: Promote to public, connections.py is using this.
_ignored_exceptions = ()

# Subclasses should set these to get semantics as close
Expand Down
5 changes: 4 additions & 1 deletion src/relstorage/adapters/mysql/connmanager.py
Expand Up @@ -136,9 +136,12 @@ def rollback_store_quietly(self, conn, cur):
# MySQL leaks state in temporary tables across
# transactions, so if we don't drop the connection,
# we need to clean up temp tables (if they exist!)
if self.rollback_quietly(conn, cur):
clean = self.rollback_quietly(conn, cur)
if clean:
try:
cur.execute('CALL clean_temp_state(true)')
cur.fetchall()
except self.driver.driver_module.Error:
log.debug("Failed to clean temp state; maybe not exists yet?")
# But we still consider it a clean rollback
return clean
1 change: 0 additions & 1 deletion src/relstorage/adapters/mysql/mover.py
Expand Up @@ -48,7 +48,6 @@ def on_store_opened(self, cursor, restart=False):
#
# It's possible that the DDL lock that TRUNCATE takes can be a bottleneck
# in some places, though?

self.driver.callproc_no_result(
cursor,
"clean_temp_state(false)"
Expand Down
Expand Up @@ -40,7 +40,7 @@ BEGIN
-- and possibly lead to database issues.
-- See https://buttondown.email/nelhage/archive/22ab771c-25b4-4cd9-b316-31a86f737acc
-- We document this in docs/postgresql/index.rst
RETURN QUERY
RETURN QUERY -- This just adds to the result table; a final bare ``RETURN`` actually ends execution
WITH locked AS (
SELECT {CURRENT_OBJECT}.zoid, {CURRENT_OBJECT}.tid, t.tid AS desired
FROM {CURRENT_OBJECT}
Expand All @@ -59,7 +59,8 @@ BEGIN

-- Unlike MySQL, we can simply do the SELECT (with PERFORM) for its
-- side effects to lock the rows.
-- This one will block.
-- This one will block. (We set the PG configuration variable ``lock_timeout``
-- from the ``commit-lock-timeout`` configuration variable to determine how long.)

-- A note on the query: PostgreSQL will typcially choose a
-- sequential scan on the temp_store table and do a nested loop join
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/sqlite/mover.py
Expand Up @@ -80,8 +80,8 @@ def on_store_opened(self, cursor, restart=False):
# connection is usually meant to be in auto-commit mode and shouldn't take
# exclusive locks until tpc_vote time. Prior to Python 3.6, if we were
# in a transaction this would commit it. but we shouldn't be in a transaction.
assert not cursor.connection.in_transaction
if not restart:
assert not cursor.connection.in_transaction, cursor
Sqlite3ScriptRunner().run_script(cursor, self.__on_store_opened_script)
assert not cursor.connection.in_transaction
cursor.connection.register_before_commit_cleanup(self._before_commit)
Expand Down
96 changes: 95 additions & 1 deletion src/relstorage/adapters/tests/test_connections.py
Expand Up @@ -27,14 +27,15 @@ class TestConnectionCommon(TestCase):
iface = IManagedStoreConnection

def _makeArgument(self):
return MockConnectionManager()
return MockConnectionManager(clean_rollback=False)

def _makeOne(self):
return self.klass(self._makeArgument())

def test_provides(self):
assert_that(self._makeOne(), validly_provides(self.iface))


class TestConnection(TestConnectionCommon):

def test_restart_and_call_does_not_activate(self):
Expand Down Expand Up @@ -203,3 +204,96 @@ def test_returning_dropped_does_not_add_to_pool(self):
assert conn.connection is None
pool.replace(conn)
self.assertEqual(0, pool.pooled_connection_count)

def _makeSemiConnManager(self):
"""
Return a real conn manager, with minimal mocking.
"""
from ..connmanager import AbstractConnectionManager
from relstorage.tests import MockOptions
from relstorage.tests import MockDriver
from relstorage.tests import MockConnection

class ConnManager(AbstractConnectionManager):
def __init__(self, options=None, driver=None):
super(ConnManager, self).__init__(options or MockOptions(),
driver or MockDriver())

self.begin_count = 0
self.restart_count = 0

def restart_store(self, *_args, **_kwargs):
self.restart_count += 1
super(ConnManager, self).restart_store(*_args, **_kwargs)

def begin(self, conn, cursor):
self.begin_count += 1
super(ConnManager, self).begin(conn, cursor)

def open(self, *_args, **_kwargs):
conn = MockConnection()
return conn, conn.cursor()

def _do_open_for_load(self, *_args, **_kwargs):
raise NotImplementedError
return ConnManager

def test_borrow_calls_on_opened_restart(self):
on_opened_count = []
def do_on_store_opened(_cursor, restart=None):
on_opened_count.append(restart)

connmanager = self._makeSemiConnManager()()
connmanager.add_on_store_opened(do_on_store_opened)

pool = StoreConnectionPool(connmanager)

# restart isn't called to borrow the connection
store_conn = pool.borrow()
self.assertEqual(1, connmanager.begin_count)
self.assertEqual(0, connmanager.restart_count)
# nor to open it,
store_conn.open_if_needed()
# nor to get the cursor, making it active.
getattr(store_conn, 'cursor')
self.assertTrue(store_conn)
self.assertTrue(store_conn.active)
self.assertEqual(1, connmanager.begin_count)
self.assertEqual(0, connmanager.restart_count)
self.assertEqual(on_opened_count, [False])

pool.replace(store_conn)
self.assertFalse(store_conn)
self.assertFalse(store_conn.active)

# Begin is called on borrowing,
# and so is restart
store_conn2 = pool.borrow()
self.assertIs(store_conn, store_conn2)
self.assertEqual(2, connmanager.begin_count)
self.assertEqual(1, connmanager.restart_count)
self.assertEqual(on_opened_count, [False, True])

def test_restart_handles_replica_closed(self):
class MockReplicaSelector(object):
def current(self):
return 1

connmanager = self._makeSemiConnManager()()
assert hasattr(connmanager, 'replica_selector')
connmanager.replica_selector = MockReplicaSelector()
pool = StoreConnectionPool(connmanager)

store_conn = pool.borrow()
orig_raw_conn = store_conn.connection

store_conn.open_if_needed()
getattr(store_conn, 'cursor')
pool.replace(store_conn)

store_conn2 = pool.borrow()
# Same object,
self.assertIs(store_conn, store_conn2)
del store_conn
# But different internals
self.assertIsNot(store_conn2.connection, orig_raw_conn)
6 changes: 5 additions & 1 deletion src/relstorage/tests/__init__.py
Expand Up @@ -14,6 +14,7 @@
from relstorage._compat import ABC
from relstorage.options import Options
from relstorage.adapters.sql import DefaultDialect
from relstorage.adapters.interfaces import ReplicaClosedException

try:
from unittest import mock as _mock
Expand Down Expand Up @@ -297,12 +298,14 @@ class MockConnectionManager(object):

isolation_load = 'SERIALIZABLE'
clean_rollback = None
_ignored_exceptions = (ReplicaClosedException,)

def __init__(self, driver=None, clean_rollback=None):
if driver is None:
self.driver = MockDriver()
if clean_rollback is not None:
self.clean_rollback = clean_rollback
self._ignored_exceptions = (ReplicaClosedException,) + self.driver.disconnected_exceptions

def rollback_quietly(self, conn, cursor): # pylint:disable=unused-argument
if hasattr(conn, 'rollback'):
Expand Down Expand Up @@ -407,7 +410,8 @@ def rollback(self, conn):
conn.rollback()

def synchronize_cursor_for_rollback(self, cursor):
cursor.fetchall()
if cursor is not None:
cursor.fetchall()

class MockObjectMover(object):
def __init__(self):
Expand Down

0 comments on commit b0553d8

Please sign in to comment.