Skip to content

Commit

Permalink
Merge 4c3d6f1 into 5a4a800
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 12, 2019
2 parents 5a4a800 + 4c3d6f1 commit 91c800b
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 35 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Expand Up @@ -44,6 +44,11 @@

- Stop checking the status of ``readCurrent`` OIDs twice.

- Make the gevent MySQL driver yield more frequently while getting
large result sets. Previously it would block in C to read the entire
result set. Now it yields according to the cursor's ``arraysize``.
See :issue:`315`.

3.0a7 (2019-08-07)
==================

Expand Down
21 changes: 20 additions & 1 deletion src/relstorage/adapters/_abstract_drivers.py
Expand Up @@ -157,8 +157,12 @@ def _sockets_gevent_monkey_patched(self):
def set_autocommit(self, conn, value):
conn.autocommit(value)

cursor_arraysize = 1024

def cursor(self, conn):
return conn.cursor()
cur = conn.cursor()
cur.arraysize = self.cursor_arraysize
return cur

def debug_connection(self, conn, *extra): # pragma: no cover
print(conn, *extra)
Expand All @@ -183,6 +187,21 @@ def connection_may_need_rollback(self, conn): # pylint:disable=unused-argument

connection_may_need_commit = connection_may_need_rollback


def synchronize_cursor_for_rollback(self, cursor):
"""Exceptions here are ignored, we don't know what state the cursor is in."""
# psycopg2 raises ProgrammingError if we rollback when no results
# are present on the cursor. mysql-connector-python raises
# InterfaceError. OTOH, mysqlclient raises nothing and even wants
# it in certain circumstances.

if cursor is not None:
fetchall = cursor.fetchall
try:
fetchall()
except Exception: # pylint:disable=broad-except
pass

# Things that can be recognized as a pickled state,
# passed to an io.BytesIO reader, and unpickled.

Expand Down
24 changes: 3 additions & 21 deletions src/relstorage/adapters/connmanager.py
Expand Up @@ -39,14 +39,6 @@ class AbstractConnectionManager(object):
# when a load connection is opened
_on_load_opened = ()

# psycopg2 raises ProgrammingError if we rollback when no results
# are present on the cursor. mysql-connector-python raises
# InterfaceError. OTOH, mysqlclient raises nothing and even wants
# it in certain circumstances.
#
# Subclasses should set this statically.
_fetchall_on_rollback = True

# The list of exceptions to ignore on a rollback *or* close. We
# take this as the union of the driver's close exceptions and disconnected
# exceptions (drivers aren't required to organize them to overlap, but
Expand Down Expand Up @@ -90,6 +82,7 @@ def __init__(self, options, driver):

self._may_need_rollback = driver.connection_may_need_rollback
self._may_need_commit = driver.connection_may_need_commit
self._synchronize_cursor_for_rollback = driver.synchronize_cursor_for_rollback
self._do_commit = driver.commit
self._do_rollback = driver.rollback

Expand Down Expand Up @@ -143,15 +136,6 @@ def close(self, conn=None, cursor=None):
clean = False
return clean

def __synchronize_cursor_for_rollback(self, cursor):
"""Exceptions here are ignored, we don't know what state the cursor is in."""
if cursor is not None and self._fetchall_on_rollback:
fetchall = cursor.fetchall
try:
fetchall()
except Exception: # pylint:disable=broad-except
pass

def __rollback_connection(self, conn, ignored_exceptions, restarting):
"""Return True if we successfully rolled back."""
clean = True
Expand Down Expand Up @@ -197,7 +181,7 @@ def __rollback(self, conn, cursor, quietly, restarting):
#
# Some drivers also don't allow you to close the cursor
# without fetching all rows.
self.__synchronize_cursor_for_rollback(cursor)
self._synchronize_cursor_for_rollback(cursor)
try:
clean = self.__rollback_connection(
conn,
Expand Down Expand Up @@ -238,9 +222,7 @@ def _do_open_for_call(self, callback): # pylint:disable=unused-argument
return self.open()

def cursor_for_connection(self, conn):
cursor = self.driver.cursor(conn)
cursor.arraysize = 1024
return cursor
return self.driver.cursor(conn)

def open_and_call(self, callback):
"""Call a function with an open connection and cursor.
Expand Down
1 change: 0 additions & 1 deletion src/relstorage/adapters/mysql/connmanager.py
Expand Up @@ -42,7 +42,6 @@ def __init__(self, driver, params, options):
self._params = params.copy()
self._db_connect = driver.connect
self._db_driver = driver
self._fetchall_on_rollback = driver.fetchall_on_rollback
super(MySQLdbConnectionManager, self).__init__(options, driver)

self.isolation_load = self.isolation_repeatable_read_ro
Expand Down
6 changes: 3 additions & 3 deletions src/relstorage/adapters/mysql/drivers/__init__.py
Expand Up @@ -74,9 +74,6 @@ class AbstractMySQLDriver(AbstractModuleDriver):
# automatically handle both these statements (``SET names binary,
# time_zone = X``).

# Does this driver need cursor.fetchall() called before a rollback?
fetchall_on_rollback = False

def cursor(self, conn):
cursor = AbstractModuleDriver.cursor(self, conn)
if self.MY_CHARSET_STMT:
Expand All @@ -85,6 +82,9 @@ def cursor(self, conn):
cursor.execute(self.MY_TIMEZONE_STMT)
return cursor

def synchronize_cursor_for_rollback(self, cursor):
"""Does nothing."""

def callproc_multi_result(self, cursor, proc, args=()):
"""
Some drivers need extra arguments to execute a statement that
Expand Down
94 changes: 87 additions & 7 deletions src/relstorage/adapters/mysql/drivers/mysqldb.py
Expand Up @@ -21,6 +21,7 @@
from zope.interface import implementer

from relstorage.adapters.interfaces import IDBDriver
from relstorage.adapters._abstract_drivers import AbstractModuleDriver

from relstorage._util import Lazy

Expand All @@ -40,7 +41,8 @@ class MySQLdbDriver(AbstractMySQLDriver):
PRIORITY_PYPY = 3
_GEVENT_CAPABLE = False

fetchall_on_rollback = True
def synchronize_cursor_for_rollback(self, cursor):
AbstractModuleDriver.synchronize_cursor_for_rollback(self, cursor)

@Lazy
def _strict_cursor(self):
Expand All @@ -50,7 +52,7 @@ def _strict_cursor(self):
# can't run this command now")), although it adds some overhead
# because of more database communication. And the docstring says you have to
# call `close()` before making another query, but in practice that
# doesn't seem to be the case.
# doesn't seem to be the case. You must consume everything though.
#
# For extra strictness/debugging, we can wrap this with our
# custom debugging cursor.
Expand Down Expand Up @@ -89,11 +91,21 @@ class GeventMySQLdbDriver(MySQLdbDriver):
_GEVENT_CAPABLE = True
_GEVENT_NEEDS_SOCKET_PATCH = False

# If we have more rows than this, it will take multiple trips to
# the socket and C to read them. OTOH, that's a rough indication
# of how often we will yield to the event loop. Note that iterating
# directly over the cursor uses fetchone(), so we will yield for
# every row. Using fetchall() will yield between fetching this many
# rows, but all the results will still be returned to the caller
# for processing in one batch.
cursor_arraysize = 10

def __init__(self):
super(GeventMySQLdbDriver, self).__init__()
# Replace self._connect (which was MySQLdb.connect) with
# direct call to our desired class.
self._connect = self._get_connection_class()
self._strict_cursor = self._connect.default_cursor

def get_driver_module(self):
# Make sure we can use gevent; if we can't the ImportError
Expand All @@ -108,25 +120,89 @@ def _get_connection_class(cls):
if cls._Connection is None:
# pylint:disable=import-error,no-name-in-module
from MySQLdb.connections import Connection as Base
from MySQLdb.cursors import SSCursor as BaseCursor

from gevent import socket
wait_read = socket.wait_read # pylint:disable=no-member
wait_write = socket.wait_write # pylint:disable=no-member
from gevent import get_hub
from gevent import sleep
wait = socket.wait # pylint:disable=no-member

class Cursor(BaseCursor):
# Internally this calls mysql_use_result(). The source
# code for that function has this comment: "There
# shouldn't be much processing per row because mysql
# server shouldn't have to wait for the client (and
# will not wait more than 30 sec/packet)." Imperically
# that doesn't seem to be true.

def _fetch_row(self, size=1):
# Somewhat surprisingly, if we just wait on read,
# we end up blocking forever. This is because of the buffers
# maintained inside the MySQL library: we might already have the
# rows that we need buffered.
# Blocking on write is pointless: by definition we're here to read results
# so we can always write. That just forces us to take a trip around the event
# loop for no good reason.
# Therefore, our best option to periodically yield is to explicitly invoke
# gevent.sleep(). Without any time given, it will yield to other ready
# greenlets; only sometimes will it force a trip around the event loop.
sleep()
return BaseCursor._fetch_row(self, size)

def fetchall(self):
result = []
fetch = self.fetchmany
while 1:
# Even if self.rowcount is 0 we must still call
# or we get the connection out of sync.
rows = fetch()
if not rows:
break
result.extend(rows)
if self.rownumber == self.rowcount:
# Avoid a useless extra trip at the end.
break
return result


# Prior to mysqlclient 1.4, there was a 'waiter' Connection
# argument that could be used to do this, but it was removed.
# So we implement it ourself.
class Connection(Base):
default_cursor = Cursor
gevent_read_watcher = None
gevent_write_watcher = None
gevent_hub = None

def check_watchers(self):
# We can be used from more than one thread in a sequential
# fashion.
hub = get_hub()
if hub is not self.gevent_hub:
self.__close_watchers()

fileno = self.fileno()
hub = self.gevent_hub = get_hub()
self.gevent_read_watcher = hub.loop.io(fileno, 1)
self.gevent_write_watcher = hub.loop.io(fileno, 2)

def __close_watchers(self):
if self.gevent_read_watcher is not None:
self.gevent_read_watcher.close()
self.gevent_write_watcher.close()
self.gevent_hub = None

def query(self, query):
# From the mysqlclient implementation:
# "Since _mysql releases the GIL while querying, we need immutable buffer"
if isinstance(query, bytearray):
query = bytes(query)

fileno = self.fileno()
wait_write(fileno)
self.check_watchers()

wait(self.gevent_write_watcher, hub=self.gevent_hub)
self.send_query(query)
wait_read(fileno)
wait(self.gevent_read_watcher, hub=self.gevent_hub)
self.read_query_result()

# The default implementations of 'rollback' and
Expand All @@ -145,5 +221,9 @@ def rollback(self):
def commit(self):
self.query('commit')

def close(self):
self.__close_watchers()
Base.close(self)

cls._Connection = Connection
return cls._Connection
2 changes: 0 additions & 2 deletions src/relstorage/adapters/postgresql/connmanager.py
Expand Up @@ -47,8 +47,6 @@ def _alter_dsn(self, replica):
dsn = '%s host=%s' % (self._dsn, replica)
return dsn

_fetchall_on_rollback = False

@metricmethod
def open(self, isolation=None, deferrable=False, read_only=False,
replica_selector=None, application_name=None, **kwargs):
Expand Down
2 changes: 2 additions & 0 deletions src/relstorage/adapters/postgresql/drivers/__init__.py
Expand Up @@ -64,6 +64,8 @@ def get_messages(self, conn):
return notices


def synchronize_cursor_for_rollback(self, cursor):
"""Does nothing."""

database_type = 'postgresql'

Expand Down
3 changes: 3 additions & 0 deletions src/relstorage/tests/__init__.py
Expand Up @@ -373,6 +373,9 @@ def commit(self, conn):
def rollback(self, conn):
conn.rollback()

def synchronize_cursor_for_rollback(self, cursor):
cursor.fetchall()

class MockObjectMover(object):
def __init__(self):
self.data = {} # {oid_int: (state, tid_int)}
Expand Down

0 comments on commit 91c800b

Please sign in to comment.