Skip to content

Commit

Permalink
Merge pull request #369 from zodb/faster-pg-gevent
Browse files Browse the repository at this point in the history
Restore the RowBatcher for non-COPY capable PostgreSQL drivers.
  • Loading branch information
jamadden committed Oct 25, 2019
2 parents 442ddc3 + 00d6bbf commit dbd9a64
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 99 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -13,6 +13,9 @@
- SQLite: Improve the speed of copying transactions into a SQLite
storage (e.g., with zodbconvert).

- PostgreSQL: Improve the speed of writes when using the 'gevent
psycopg2' driver.

3.0b1 (2019-10-22)
==================

Expand Down
49 changes: 49 additions & 0 deletions src/relstorage/adapters/drivers.py
Expand Up @@ -348,3 +348,52 @@ def implement_db_driver_options(name, *driver_modules):

module.select_driver = lambda driver_name=None: _select_driver_by_name(driver_name,
sys.modules[name])


try:
import gevent
except ImportError:
pass
else:
from gevent.socket import wait
get_hub = gevent.get_hub

class GeventConnectionMixin(object):

def __init__(self, *args, **kwargs):
self.gevent_hub = None
self.gevent_read_watcher = None
self.gevent_write_watcher = None
super(GeventConnectionMixin, self).__init__(*args, **kwargs)

def close(self):
self.__close_watchers()
super(GeventConnectionMixin, self).close()

def __check_watchers(self):
# We can be used from more than one thread in a sequential
# fashion.
hub = get_hub()
if hub is not self.gevent_hub:
self.__close_watchers()

fileno = self.fileno()
hub = self.gevent_hub = get_hub()
self.gevent_read_watcher = hub.loop.io(fileno, 1)
self.gevent_write_watcher = hub.loop.io(fileno, 2)

def __close_watchers(self):
if self.gevent_read_watcher is not None:
self.gevent_read_watcher.close()
self.gevent_write_watcher.close()
self.gevent_hub = None

def gevent_wait_read(self):
self.__check_watchers()
wait(self.gevent_read_watcher,
hub=self.gevent_hub)

def gevent_wait_write(self):
self.__check_watchers()
wait(self.gevent_write_watcher,
hub=self.gevent_hub)
118 changes: 102 additions & 16 deletions src/relstorage/adapters/mover.py
Expand Up @@ -37,6 +37,12 @@
object_state = Schema.object_state


def _compute_md5sum(_self, data):
if data is None:
return None
return md5(data).hexdigest()



@implementer(IObjectMover)
class AbstractObjectMover(DatabaseHelpersMixin, ABC):
Expand All @@ -55,11 +61,7 @@ def __init__(self, database_driver, options, runner=None,
self.version_detector = version_detector
self.make_batcher = batcher_factory

@noop_when_history_free
def _compute_md5sum(self, data):
if data is None:
return None
return md5(data).hexdigest()
_compute_md5sum = noop_when_history_free(_compute_md5sum)

_load_current_query = objects.select(
objects.c.state, objects.c.tid
Expand Down Expand Up @@ -242,18 +244,42 @@ def on_load_opened(self, cursor, restart=False):

@metricmethod_sampled
def store_temps(self, cursor, state_oid_tid_iter):
"""
Uses the cursor's ``executemany`` method to store temporary
objects.
If there is a more optimal way to implement putting objects in
the database, please do so.
- On SQLite, ``executemany`` is implemnted in a C looping
over the provided iterator. Which it turns out is
exactly what the normal ``execute`` method also does (it
just uses a one-row iterator). So ``executemany`` that
saves substantial setup overhead dealing with sqlite's
prepared statements.
- On Postgresql, we use COPY for this (unless we're using
the 'gevent psycopg2' driver; it's the only thing that
doesn't support COPY). None of the supported PostgreSQL
drivers have a good ``executemany`` method, so they
should fall back to using our own RowBatcher.
- On Oracle, we use the RowBatcher with a combination of
bulk array operations and direct inserts.
- On MySQL, the preferred driver (mysqlclient) has a
decent implementation of executemany for INSERT or
REPLACE (basically an optimized form of what our
RowBatcher does). That implementation is shared with
PyMySQL as well, but it must be a simple INSERT
statement matching a regular expression. Note that it
has a bug though: it can't handle an iterator that's
empty.
"""
query = self._store_temp_query
do_md5 = self._compute_md5sum
Binary = self.driver.Binary
# On Postgresql, we use COPY for this.
# On Oracle, we use the RowBatcher.
# On SQLite, executemany is implemented in C looping around our
# iterator; that still saves substantial setup overhead in the statement
# cache.
# On MySQL, the preferred driver (mysqlclient) has a decent implementation
# of executemany for INSERT; that's shared with PyMySQL as well, but it must be
# a simple INSERT statement matching a regular expression. Note that it has a bug though:
# it can't handle an iterator that's empty.

query.executemany(
cursor,
(
Expand All @@ -265,8 +291,13 @@ def store_temps(self, cursor, state_oid_tid_iter):

@metricmethod_sampled
def replace_temps(self, cursor, state_oid_tid_iter):
# Reuse the upsert query. The same comments apply. In particular,
# MySQLclient won't optimize an UPDATE in the same way it does an INSERT.
"""
Assumes that ``store_temps`` is using an upsert query and simply calls
that method.
The same comments apply. In particular,
MySQLclient won't optimize an UPDATE in the same way it does an INSERT.
"""
self.store_temps(cursor, state_oid_tid_iter)

@metricmethod_sampled
Expand Down Expand Up @@ -601,3 +632,58 @@ def upload_blob(self, cursor, oid, tid, filename):
cursor, oid, tid, filename,
self._upload_blob_uses_chunks, insert_stmt, use_tid
)


class RowBatcherStoreTemps(object):
"""
A helper class to implement ``store_temps`` using a RowBatcher.
You must provide an implementation of
:meth:`store_temp_into_batcher` and it must be an upsert. The
:meth:`generic_store_temp_into_batcher` method can be used to help
with this.
"""

def __init__(self, keep_history, binary, batcher_factory=RowBatcher):
self.make_batcher = batcher_factory
self.keep_history = keep_history
self.binary = binary

_compute_md5sum = noop_when_history_free(_compute_md5sum)

@metricmethod_sampled
def store_temps(self, cursor, state_oid_tid_iter):
store_temp = self.store_temp_into_batcher
batcher = self.make_batcher(cursor) # Default row limit
for data, oid_int, tid_int in state_oid_tid_iter:
store_temp(batcher, oid_int, tid_int, data)
batcher.flush()

replace_temps = store_temps

# The _generic methods allow for UPSERTs, at least on MySQL
# and PostgreSQL. Previously, MySQL used `command='REPLACE'`
# for an UPSERT; now it uses a suffix 'ON DUPLICATE KEY UPDATE ...'.
# PostgreSQL uses a suffix 'ON CONFLICT (...) UPDATE ...'.

generic_command = 'INSERT'
generic_suffix = ''

def generic_store_temp_into_batcher(self, batcher, oid, prev_tid, data):
md5sum = self._compute_md5sum(data)
command = self.generic_command
suffix = self.generic_suffix
# TODO: Now that we guarantee not to feed duplicates here, drop
# the conflict handling.
if command == 'INSERT' and not suffix:
batcher.delete_from('temp_store', zoid=oid)
batcher.insert_into(
"temp_store (zoid, prev_tid, md5, state)",
batcher.row_schema_of_length(4),
(oid, prev_tid, md5sum, self.binary(data)),
rowkey=oid,
size=len(data) + 32,
command=command,
suffix=suffix
)

store_temp_into_batcher = generic_store_temp_into_batcher
36 changes: 5 additions & 31 deletions src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py
Expand Up @@ -16,16 +16,13 @@
from __future__ import division
from __future__ import print_function

from gevent import socket
from gevent import get_hub
from gevent import sleep as gevent_sleep
wait = socket.wait # pylint:disable=no-member


# pylint:disable=wrong-import-position,no-name-in-module,import-error
from MySQLdb.connections import Connection as BaseConnection
from MySQLdb.cursors import SSCursor

from relstorage.adapters.drivers import GeventConnectionMixin
from . import IterateFetchmanyMixin

class Cursor(IterateFetchmanyMixin, SSCursor):
Expand Down Expand Up @@ -92,7 +89,8 @@ def fetchall(self):
def _noop():
"Does nothing"

class Connection(BaseConnection):
class Connection(GeventConnectionMixin,
BaseConnection):
default_cursor = Cursor
gevent_read_watcher = None
gevent_write_watcher = None
Expand All @@ -114,24 +112,6 @@ def exit_critical_phase_at_transaction_end(self):
del self.query
del self.sleep

def check_watchers(self):
# We can be used from more than one thread in a sequential
# fashion.
hub = get_hub()
if hub is not self.gevent_hub:
self.__close_watchers()

fileno = self.fileno()
hub = self.gevent_hub = get_hub()
self.gevent_read_watcher = hub.loop.io(fileno, 1)
self.gevent_write_watcher = hub.loop.io(fileno, 2)

def __close_watchers(self):
if self.gevent_read_watcher is not None:
self.gevent_read_watcher.close()
self.gevent_write_watcher.close()
self.gevent_hub = None

def _critical_query(self, query):
return BaseConnection.query(self, query)

Expand All @@ -141,12 +121,10 @@ def query(self, query):
if isinstance(query, bytearray):
query = bytes(query)

self.check_watchers()

wait(self.gevent_write_watcher, hub=self.gevent_hub)
self.gevent_wait_write()
self.send_query(query)

wait(self.gevent_read_watcher, hub=self.gevent_hub)
self.gevent_wait_read()
self.read_query_result()

# The default implementations of 'rollback' and
Expand Down Expand Up @@ -177,10 +155,6 @@ def _critical_commit(self):
def commit(self):
self.query('commit')

def close(self):
self.__close_watchers()
BaseConnection.close(self)

def __delattr__(self, name):
# BaseConnection has a delattr that forbids
# all deletion. Not helpful.
Expand Down
81 changes: 39 additions & 42 deletions src/relstorage/adapters/oracle/mover.py
Expand Up @@ -22,51 +22,12 @@

from ..interfaces import IObjectMover
from ..mover import AbstractObjectMover
from ..mover import RowBatcherStoreTemps
from ..mover import metricmethod_sampled

class OracleRowBatcherStoreTemps(RowBatcherStoreTemps):

@implementer(IObjectMover)
class OracleObjectMover(AbstractObjectMover):

# This is assigned to by the adapter.
inputsizes = None


@metricmethod_sampled
def get_object_tid_after(self, cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
Returns None if no later state exists.
"""
stmt = """
SELECT MIN(tid)
FROM object_state
WHERE zoid = :1
AND tid > :2
"""
cursor.execute(stmt, (oid, tid))
rows = cursor.fetchall()
if rows:
# XXX: If we can use rowcount here, we can combine
# with superclass.
assert len(rows) == 1
return rows[0][0]

# no store connection initialization needed for Oracle
def on_store_opened(self, cursor, restart=False):
pass

@metricmethod_sampled
def store_temps(self, cursor, state_oid_tid_iter):
store_temp = self._store_temp
batcher = self.make_batcher(cursor) # Default row limit
for data, oid_int, tid_int in state_oid_tid_iter:
store_temp(batcher, oid_int, tid_int, data)
batcher.flush()


@metricmethod_sampled
def _store_temp(self, batcher, oid, prev_tid, data):
def store_temp_into_batcher(self, batcher, oid, prev_tid, data):
md5sum = self._compute_md5sum(data)

size = len(data)
Expand Down Expand Up @@ -97,6 +58,42 @@ def _store_temp(self, batcher, oid, prev_tid, data):
size=size,
)


@implementer(IObjectMover)
class OracleObjectMover(OracleRowBatcherStoreTemps,
AbstractObjectMover):

# This is assigned to by the adapter.
inputsizes = None

def __init__(self, *args, **kwargs):
AbstractObjectMover.__init__(self, *args, **kwargs)
OracleRowBatcherStoreTemps.__init__(self, self.keep_history, self.driver.Binary)

@metricmethod_sampled
def get_object_tid_after(self, cursor, oid, tid):
"""Returns the tid of the next change after an object revision.
Returns None if no later state exists.
"""
stmt = """
SELECT MIN(tid)
FROM object_state
WHERE zoid = :1
AND tid > :2
"""
cursor.execute(stmt, (oid, tid))
rows = cursor.fetchall()
if rows:
# XXX: If we can use rowcount here, we can combine
# with superclass.
assert len(rows) == 1
return rows[0][0]

# no store connection initialization needed for Oracle
def on_store_opened(self, cursor, restart=False):
pass

@metricmethod_sampled
def restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Expand Down

0 comments on commit dbd9a64

Please sign in to comment.