Skip to content

Commit

Permalink
Merge 280038f into 47d49e5
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 23, 2019
2 parents 47d49e5 + 280038f commit 188d9bb
Show file tree
Hide file tree
Showing 24 changed files with 578 additions and 223 deletions.
17 changes: 12 additions & 5 deletions CHANGES.rst
Expand Up @@ -17,10 +17,10 @@
still paid the overhead of locking method calls and code complexity.

The top-level storage (the one belonging to a ``ZODB.DB``) still
used heavyweight locks. ``ZODB.DB.storage`` is documented as being
only useful for tests, and the ``DB`` object itself does not expose
any operations that use the storage in a way that would require
thread safety.
used heavyweight locks in earlier releases. ``ZODB.DB.storage`` is
documented as being only useful for tests, and the ``DB`` object
itself does not expose any operations that use the storage in a way
that would require thread safety.

The remaining thread safety support has been removed. This
simplifies the code and reduces overhead.
Expand Down Expand Up @@ -58,7 +58,14 @@
against the ``transaction`` table in history-preserving databases.
See :issue:`159`.

- Prepare more statements.
- Prepare more statements used during regular polling.

- Gracefully handle certain disconnected exceptions when rolling back
connections in between transactions. See :issue:`280`.

- Refactor RelStorage internals for a cleaner separation of concerns.
This includes how (some) queries are written and managed, making it
easier to prepare statements, but only those actually used.

3.0a5 (2019-07-11)
==================
Expand Down
123 changes: 88 additions & 35 deletions src/relstorage/adapters/connmanager.py
Expand Up @@ -16,26 +16,21 @@
from perfmetrics import metricmethod
from zope.interface import implementer


from .interfaces import IConnectionManager
from .interfaces import ReplicaClosedException
from .replica import ReplicaSelector

logger = __import__('logging').getLogger(__name__)

@implementer(IConnectionManager)
class AbstractConnectionManager(object):
"""Abstract base class for connection management.
"""
Abstract base class for connection management.
Responsible for opening and closing database connections.
"""

# disconnected_exceptions contains the exception types that might be
# raised when the connection to the database has been broken.
disconnected_exceptions = (ReplicaClosedException,)

# close_exceptions contains the exception types to ignore
# when the adapter attempts to close a database connection.
close_exceptions = ()

# a series of callables (cursor, restart=bool)
# for when a store connection is opened.
_on_store_opened = ()
Expand All @@ -44,18 +39,39 @@ class AbstractConnectionManager(object):
# when a load connection is opened
_on_load_opened = ()

# psycopg2 raises ProgrammingError if we rollback when nothing is present.
# mysql-connector-python raises InterfaceError.
# OTOH, mysqlclient raises nothing and even wants it.
# psycopg2 raises ProgrammingError if we rollback when no results
# are present on the cursor. mysql-connector-python raises
# InterfaceError. OTOH, mysqlclient raises nothing and even wants
# it in certain circumstances.
#
# Subclasses should set this statically.
_fetchall_on_rollback = True

def __init__(self, options):
# options is a relstorage.options.Options instance
# The list of exceptions to ignore on a rollback *or* close. We
# take this as the union of the driver's close exceptions and disconnected
# exceptions (drivers aren't required to organize them to overlap, but
# in practice they should.)
_ignored_exceptions = ()

replica_selector = None

def __init__(self, options, driver):
"""
:param options: A :class:`relstorage.options.Options`.
:param driver: A :class:`relstorage.adapters.interfaces.IDBDriver`,
which we use for its exceptions.
"""
self.driver = driver

self._ignored_exceptions = tuple(set(
driver.close_exceptions
+ driver.disconnected_exceptions
+ (ReplicaClosedException,)
))

if options.replica_conf:
self.replica_selector = ReplicaSelector(
options.replica_conf, options.replica_timeout)
else:
self.replica_selector = None

if options.ro_replica_conf:
self.ro_replica_selector = ReplicaSelector(
Expand Down Expand Up @@ -94,40 +110,76 @@ def open(self, **kwargs):
def close(self, conn=None, cursor=None):
"""
Close a connection and cursor, ignoring certain errors.
Return a True value if the connection was closed cleanly. Return
a False value if the processes ignored an error.
"""
for obj in (cursor, conn):
clean = True
for obj in (cursor, conn): # cursor first; some drivers want that done
if obj is not None:
try:
obj.close()
except self.close_exceptions:
pass
except self._ignored_exceptions: # pylint:disable=catching-non-exception
clean = False
return clean

def rollback_and_close(self, conn, cursor):
# Some drivers require the cursor to be closed first.

# Some drivers also don't allow you to close without fetching
# all rows.
def __synchronize_cursor_for_rollback(self, cursor):
"""Exceptions here are ignored, we don't know what state the cursor is in."""
if cursor is not None and self._fetchall_on_rollback:
fetchall = cursor.fetchall
try:
cursor.fetchall()
fetchall()
except Exception: # pylint:disable=broad-except
pass
self.close(cursor)

@staticmethod
def __rollback_connection(conn, ignored_exceptions):
"""Return True if we successfully rolled back."""
clean = True
if conn is not None:
assert cursor is not None
try:
conn.rollback()
except self.close_exceptions:
pass
finally:
self.close(conn)
except ignored_exceptions:
logger.debug("Ignoring exception rolling back connection", exc_info=True)
clean = False
return clean

def __rollback(self, conn, cursor, quietly):
# If an error occurs, close the connection and cursor.
#
# Some drivers require the cursor to be closed before closing
# the connection.
#
# Some drivers also don't allow you to close the cursor
# without fetching all rows.
self.__synchronize_cursor_for_rollback(cursor)
try:
clean = self.__rollback_connection(
conn,
# Let it raise if we're not meant to be quiet.
self._ignored_exceptions if quietly else ()
)
except:
clean = False
raise
finally:
if not clean:
self.close(conn, cursor)
return clean

def rollback_and_close(self, conn, cursor):
clean = self.__rollback(conn, cursor, True)
if clean:
# if an error already occurred, we closed things.
clean = self.close(conn, cursor)

return clean

def rollback(self, conn, cursor):
# Like rollback and close, but doesn't bury exceptions.
if self._fetchall_on_rollback:
cursor.fetchall()
conn.rollback()
return self.__rollback(conn, cursor, False)

def rollback_quietly(self, conn, cursor):
return self.__rollback(conn, cursor, True)

def open_and_call(self, callback):
"""Call a function with an open connection and cursor.
Expand All @@ -147,6 +199,7 @@ def open_and_call(self, callback):
raise
else:
self.close(cursor)
cursor = None
conn.commit()
return res
finally:
Expand Down
107 changes: 89 additions & 18 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -22,6 +22,39 @@
# pylint:disable=inherit-non-class,no-method-argument,no-self-argument
# pylint:disable=too-many-ancestors

try:
from zope.schema import Tuple
from zope.schema import Object
from zope.interface.common.interfaces import IException
except ImportError: # pragma: no cover
# We have nti.testing -> zope.schema as a test dependency; but we
# don't have it as a hard-coded runtime dependency because we
# don't want to force a version on consumers of RelStorage.
def Tuple(*_args, **kwargs):
return Attribute(kwargs['description'])

Object = Tuple

def Factory(schema, description='', **_kw):
return Attribute(description + " (Must implement %s)" % schema)

IException = Interface
else:
from zope.schema.interfaces import SchemaNotProvided as _SchemaNotProvided
from zope.schema import Field as _Field

class Factory(_Field):
def __init__(self, schema, **kw):
self.schema = schema
_Field.__init__(self, **kw)

def _validate(self, value):
super(Factory, self)._validate(value)
if not self.schema.implementedBy(value):
raise _SchemaNotProvided(self.schema, value).with_field_and_value(self, value)



class IRelStorageAdapter(Interface):
"""A database adapter for RelStorage"""

Expand Down Expand Up @@ -66,20 +99,35 @@ class IDBDriver(Interface):

__name__ = Attribute("The name of this driver")

disconnected_exceptions = Attribute("A tuple of exceptions this driver can raise if it is "
"disconnected from the database.")
close_exceptions = Attribute("A tuple of exceptions that we can ignore when we try to "
"close the connection to the database. Often this is the same "
"or an extension of `disconnected_exceptions`.")

lock_exceptions = Attribute("A tuple of exceptions") # XXX: Document

use_replica_exceptions = Attribute("A tuple of exceptions raised by connecting "
"that should cause us to try a replica.")
disconnected_exceptions = Tuple(
description=(u"A tuple of exceptions this driver can raise on any operation if it is "
u"disconnected from the database."),
value_type=Factory(IException)
)

close_exceptions = Tuple(
description=(u"A tuple of exceptions that we can ignore when we try to "
u"close the connection to the database. Often this is the same "
u"or an extension of `disconnected_exceptions`."
u"These exceptions may also be ignored on rolling back the connection, "
u"if we are otherwise completely done with it and prepared to drop it. "),
value_type=Factory(IException),
)

lock_exceptions = Tuple(
description=u"A tuple of exceptions",
value_type=Factory(IException),
) # XXX: Document

use_replica_exceptions = Tuple(
description=(u"A tuple of exceptions raised by connecting "
u"that should cause us to try a replica."),
value_type=Factory(IException)
)

Binary = Attribute("A callable.")

dialect = Attribute("The IDBDialect for this driver.")
dialect = Object(IDBDialect, description=u"The IDBDialect for this driver.")

def binary_column_as_state_type(db_column_data):
"""
Expand Down Expand Up @@ -209,32 +257,46 @@ def known_driver_factories():
class IConnectionManager(Interface):
"""Open and close database connections"""

disconnected_exceptions = Attribute(
"""The tuple of exception types that might be
raised when the connection to the database has been broken.
""")

def open():
"""Open a database connection and return (conn, cursor)."""

def close(conn=None, cursor=None):
"""
Close a connection and cursor, ignoring certain errors.
Return a True value if the connection was closed cleanly;
return a false value if an error was ignored.
"""

def rollback_and_close(conn, cursor):
"""
Rollback the connection and close it.
Rollback the connection and close it, ignoring certain errors.
Certain database drivers, such as MySQLdb using the SSCursor, require
all cursors to be closed before rolling back (otherwise it generates a
ProgrammingError: 2014 "Commands out of sync").
This method abstracts that.
:return: A true value if the connection was closed without ignoring any exceptions;
if an exception was ignored, returns a false value.
"""

def rollback(conn, cursor):
"""
Like `rollback_and_close`, but without the close, and letting
errors pass.
If an error does happen, then the connection and cursor are closed
before this method returns.
"""

def rollback_quietly(conn, cursor):
"""
Like `rollback_and_close`, but without the close.
:return: A true value if the connection was rolled back without ignoring any exceptions;
if an exception was ignored, returns a false value (and the connection and cursor
are closed before this method returns).
"""

def open_and_call(callback):
Expand Down Expand Up @@ -829,7 +891,16 @@ def commit_phase2(conn, cursor, txn):
"""

def abort(conn, cursor, txn=None):
"""Abort the commit. If txn is not None, phase 1 is also aborted."""
"""
Abort the commit, ignoring certain exceptions.
If *txn* is not None, phase 1 is also aborted.
:return: A true value if the connection was rolled back
without ignoring any exceptions; if an exception was
ignored, returns a false value (and the connection
and cursor are closed before this method returns).
"""


class ReplicaClosedException(Exception):
Expand Down
7 changes: 2 additions & 5 deletions src/relstorage/adapters/mysql/connmanager.py
Expand Up @@ -38,13 +38,10 @@ class MySQLdbConnectionManager(AbstractConnectionManager):

def __init__(self, driver, params, options):
self._params = params.copy()
self.disconnected_exceptions = driver.disconnected_exceptions
self.close_exceptions = driver.close_exceptions
self.use_replica_exceptions = driver.use_replica_exceptions
self._db_connect = driver.connect
self._db_driver = driver
self._fetchall_on_rollback = driver.fetchall_on_rollback
super(MySQLdbConnectionManager, self).__init__(options)
super(MySQLdbConnectionManager, self).__init__(options, driver)

def _alter_params(self, replica):
"""Alter the connection parameters to use the specified replica.
Expand Down Expand Up @@ -91,7 +88,7 @@ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
self._db_driver.set_autocommit(conn, False)
conn.replica = replica
return conn, cursor
except self.use_replica_exceptions as e:
except self.driver.use_replica_exceptions as e:
if replica is not None:
next_replica = replica_selector.next()
if next_replica is not None:
Expand Down

0 comments on commit 188d9bb

Please sign in to comment.