Skip to content

Commit

Permalink
Merge pull request #350 from zodb/faster-pack
Browse files Browse the repository at this point in the history
Substantially reduce memory usage of packing
  • Loading branch information
jamadden committed Sep 20, 2019
2 parents b26c3cf + 53b7229 commit b1a4c83
Show file tree
Hide file tree
Showing 21 changed files with 468 additions and 185 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Expand Up @@ -37,6 +37,15 @@
``--prepack`` and ``--use-prepack-state`` to the RelStorage, even
when it has been wrapped in a ``zc.zlibstorage``.

- Reduce the amount of memory required to pack a RelStorage through
more careful datastructure choices. On CPython 3, the peak
memory usage of the prepack phase can be up to 9 times less. On
CPython 2, pre-packing a 30MM row storage required 3GB memory; now
it requires about 200MB.

- Use server-side cursors during packing when available, further
reducing the amount of memory required. See :issue:`165`.

3.0a10 (2019-09-04)
===================

Expand Down
98 changes: 93 additions & 5 deletions src/relstorage/_compat.py
Expand Up @@ -7,10 +7,11 @@
from __future__ import division
from __future__ import print_function


import array
import functools
import os
import platform
import sys
import os

import BTrees
# XXX: This is a private module in ZODB, but it has a lot
Expand Down Expand Up @@ -47,12 +48,14 @@
'iterkeys',
'itervalues',

# OID and TID datastructures and algorithms
"OID_TID_MAP_TYPE",
'OID_OBJECT_MAP_TYPE',
'OID_SET_TYPE',
'OidTMap_difference',
'OidTMap_multiunion',
'OidTMap_intersection',
'OidList',

'MAX_TID',
'iteroiditems',
Expand Down Expand Up @@ -88,9 +91,12 @@ def list_values(d):
iterkeys = dict.iterkeys # pylint:disable=no-member
itervalues = dict.itervalues # pylint:disable=no-member

# These types need to be atomic for primitive operations,
# so don't accept Python BTree implementations. (Also, on PyPy,
# the Python BTree implementation uses more memory than a dict.)
# OID and TID data structures.
#
# The cache MVCC implementation depends on the map types being atomic
# for primitive operations, so don't accept Python BTree
# implementations. (Also, on PyPy, the Python BTree implementation
# uses more memory than a dict.)
if BTrees.LLBTree.LLBTree is not BTrees.LLBTree.LLBTreePy: # pylint:disable=no-member
OID_TID_MAP_TYPE = BTrees.family64.II.BTree
OID_OBJECT_MAP_TYPE = BTrees.family64.IO.BTree
Expand Down Expand Up @@ -124,6 +130,88 @@ def OidSet_difference(c1, c2):

OidSet_discard = set.discard

# Lists of OIDs. These could be simple list() objects, or we can treat
# them as numbers and store them in array.array objects, if we have an
# unsigned 64-bit element type. array.array, just like the C version
# of BTrees, uses less memory or CPython, but has a cost converting
# back and forth between objects and native values. What's the cost?
# Let's measure.
#
# Test: list(xrange(30000000)) vs array.array('L', xrange(30000000))
# on Python 2, with minor modifications (range and 'Q') on Python 3.
#
# list mem | array mem | list time | array time
# CPython 2: 861MB | 228MB | 596ms | 2390ms
# PyPy2 7.1: 229MB | 227MB | 178ms | 1830ms
# CPython 3.7: 2117MB | 232MB | 3680ms | 3150ms
#
# Test: Same as above, but using 300 instead of 30000000
# list time | array time
# CPython 2: 6.28ms | 6.3ms
# PyPy2 7.1: 1.34ms | 1.43ms
# CPython 3.7: 3.69ms | 3.74ms
#
# Slicing x(30000000)[30000:30200]
# list time | array time
# CPython 2: 427ns | 148ns
# PyPy2 7.1*: 138ns | 8950ns
# CPython 3.7: 671ns | 411ns
#
# iterate x(30000000): for _ in x: pass
# list time | array time | small list time | small array time
# CPython 2: 357ms | 604ms | 2640ns | 6050ns
# PyPy2 7.1*: 51ms | 592ms | 601ns | 5910ns
# CPython 3.7: 308ms | 2240ms | 2250ns | 6170ns
# * On PyPy, the test was wrapped in a method for better JIT.
#
# Using BTrees.family64.II.TreeSet(range(30000000))
#
# memory | construction time | iteration time
# CPython 2: 564MB | 2740ms | 520ms
# CPython 3.7: 573MB | 5280ms | 2390ms
#
#
# Observations:
# - Large list() is faster to create on CPython 2, but uses 4x the memory.
# - Large list() is *slower* to create on CPython 3 and uses an incredible
# 9x the memory. Relative to Python 2, I suspect the differences have to do with
# all Python 3 integers being variable-length long objects, unlike Python 2.
# I suspect that accounts for much of the difference in general.
# - PyPy memory usage is comparable for both list and array (which makes sense, it has
# a specialized strategy for lists of integers), but large lists are faster to
# create for some reason.
# - Creation times for small sets is basically the same on all platforms.
# - Slicing time of arrays is faster on CPython 2 and 3 but much slower on PyPy.
# - Iterating arrays is substantially slower on all platforms and for all sizes.
# - However, creating arrays is faster than creating 64-bit TreeSets; iteration
# is about the same.
#
# Conclusions:
# Except on PyPy, when working with a large list of OIDs, a 64-bit array.array
# will save a substantial amount of memory. On Python 3, it will probably be slightly
# faster to create too; on both Python 2 and 3 it will be faster and smaller than an equivalent
# TreeSet. Slicing is faster with arrays as well. Iteration is around 3x slower, but that's likely
# to be noise compared to the body of the loop.
# Thus, everywhere except PyPy, if we have an unsigned 64-bit array.array available, that should
# be our choice.
_64bit_array = None
try:
# Find out if we have a native unsigned 64-bit type
array.array('Q', [1])
_64bit_array = functools.partial(array.array, 'Q')
except ValueError:
# We don't. Either we're on Python 2 or the compiler doesn't support 'long long'.
# What about a regular unsigned long? If we're on a 64-bit platform, that
# might be enough.
a = array.array('L', [1])
if a.itemsize >= 8:
_64bit_array = functools.partial(array.array, 'L')

if _64bit_array and not PYPY:
OidList = _64bit_array
else:
OidList = list

MAX_TID = BTrees.family64.maxint

def iteroiditems(d):
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/_abstract_drivers.py
Expand Up @@ -168,7 +168,7 @@ def _sockets_gevent_monkey_patched(self):
def set_autocommit(self, conn, value):
conn.autocommit(value)

def cursor(self, conn):
def cursor(self, conn, server_side=False): # pylint:disable=unused-argument
cur = conn.cursor()
cur.arraysize = self.cursor_arraysize
return cur
Expand Down
9 changes: 9 additions & 0 deletions src/relstorage/adapters/connections.py
Expand Up @@ -235,6 +235,15 @@ def isolated_connection(self):
finally:
self.connmanager.rollback_and_close(conn, cursor)

@contextlib.contextmanager
def server_side_cursor(self):
conn, _ = self.open_if_needed()
ss_cursor = self.connmanager.driver.cursor(conn, server_side=True)
try:
yield ss_cursor
finally:
ss_cursor.close()

def __repr__(self):
return "<%s at 0x%x active=%s, conn=%r cur=%r>" % (
self.__class__.__name__,
Expand Down
54 changes: 54 additions & 0 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -79,6 +79,60 @@ class IDBDriver(Interface):

dialect = Object(IDBDialect, description=u"The IDBDialect for this driver.")

cursor_arraysize = Attribute(
"The value to assign to each new cursor's ``arraysize`` attribute.")

def connect(*args, **kwargs):
"""
Create and return a new connection object.
This connection, and all objects created from it such as cursors,
should be used within a single thread only.
"""

def cursor(connection, server_side=False):
"""
Create and return a new cursor sharing the state of the given
*connection*.
The cursor should be closed when it is no longer needed. The
cursor should be considered forward-only (no backward
scrolling) and ephemeral (results go away when the attached
transaction is committed or rolled back).
For compatibility, previous cursors should not have
outstanding results pending when this is called and while the
returned cursor is used (not all drivers permit multiple
active cursors).
If *server_side* is true (not the default), request that the
driver creates a cursor that will **not** buffer the complete
results of a query on the client. Instead, the results should
be streamed from the server in batches. This can reduce the
maximum amount of memory needed to handle results, if done
carefully.
For compatibility, server_side cursors can only be used
to execute a single query.
Most drivers (``psycopg2``, ``psycopg2cffi``, ``pg8000``,
``mysqlclient``) default to buffering the entire results
client side before returning from the ``execute`` method. This
can reduce latency and increase overall throughput, but at the
cost of memory, especially if the results will be copied into
different data structures.
Not all drivers support server-side cursors; they will ignore
that request. At this writing, this includes ``pg8000``. Some
drivers (at this writing, only ``gevent MySQLdb``) always use
server-side cursors. The ``cx_Oracle`` driver is unevaluated.
``psycopg2`` and ``psycopg2cffi`` both iterate in chunks of
``cur.itersize`` by default. PyMySQL seems to iterate one row at a time.
``mysqlclient`` defaults to also iterating one row at a time, but
we patch that to operate in chunks of ``cur.arraysize``.
"""

def binary_column_as_state_type(db_column_data):
"""
Turn *db_column_data* into something that's a valid pickle
Expand Down
38 changes: 35 additions & 3 deletions src/relstorage/adapters/mysql/drivers/__init__.py
Expand Up @@ -46,6 +46,24 @@ class MySQLDialect(DefaultDialect):
def compiler_class(self):
return MySQLCompiler

class IterateFetchmanyMixin(object):
"""
Mixin to cause us to fetch in batches using fetchmany().
"""
sleep = None
def __iter__(self):
fetch = self.fetchmany
sleep = self.sleep
batch = fetch()
while batch:
for row in batch:
yield row
if sleep is not None:
sleep() # pylint:disable=not-callable
batch = fetch()

next = __next__ = None

class AbstractMySQLDriver(AbstractModuleDriver):

# Don't try to decode pickle states as UTF-8 (or whatever the
Expand Down Expand Up @@ -79,11 +97,25 @@ class AbstractMySQLDriver(AbstractModuleDriver):
# automatically handle both these statements (``SET names binary,
# time_zone = X``).

def cursor(self, conn):
cursor = AbstractModuleDriver.cursor(self, conn)
_server_side_cursor = None
_ignored_fetchall_on_set_exception = ()

def _make_cursor(self, conn, server_side=False):
if server_side:
cursor = conn.cursor(self._server_side_cursor)
cursor.arraysize = self.cursor_arraysize
else:
cursor = super(AbstractMySQLDriver, self).cursor(conn, server_side=False)
return cursor

def cursor(self, conn, server_side=False):
cursor = self._make_cursor(conn, server_side=server_side)
for stmt in self.CURSOR_INIT_STMTS:
cursor.execute(stmt)
cursor.fetchall()
try:
cursor.fetchall()
except self._ignored_fetchall_on_set_exception:
pass
return cursor

def synchronize_cursor_for_rollback(self, cursor):
Expand Down
14 changes: 3 additions & 11 deletions src/relstorage/adapters/mysql/drivers/_mysqldb_gevent.py
Expand Up @@ -24,10 +24,11 @@

# pylint:disable=wrong-import-position,no-name-in-module,import-error
from MySQLdb.connections import Connection as BaseConnection
from MySQLdb.cursors import SSCursor as BaseCursor
from MySQLdb.cursors import SSCursor

from . import IterateFetchmanyMixin

class Cursor(BaseCursor):
class Cursor(IterateFetchmanyMixin, SSCursor):
# Internally this calls mysql_use_result(). The source
# code for that function has this comment: "There
# shouldn't be much processing per row because mysql
Expand Down Expand Up @@ -83,15 +84,6 @@ def fetchall(self):
self.sleep()
return result

def __iter__(self):
fetch = self.fetchmany
batch = fetch()
while batch:
for row in batch:
yield row
self.sleep()
batch = fetch()

def enter_critical_phase_until_transaction_end(self):
# May make multiple enters.
if 'sleep' not in self.__dict__:
Expand Down
19 changes: 14 additions & 5 deletions src/relstorage/adapters/mysql/drivers/mysqlconnector.py
Expand Up @@ -56,7 +56,9 @@ def __init__(self):
# conn.close() -> InternalError: Unread result found
# By the time we get to a close(), it's too late to do anything about it.
self.close_exceptions += (self.driver_module.InternalError,)

# Ignore "no result set to fetch from" when we do our init
# statements.
self._ignored_fetchall_on_set_exception = (self.driver_module.InterfaceError,)
if self.Binary is str:
self.Binary = bytearray

Expand Down Expand Up @@ -161,6 +163,9 @@ def connect(self, *args, **kwargs):
converter_class = self._get_converter_class()
kwargs['converter_class'] = converter_class
kwargs['get_warnings'] = True
# By default, make it fetch all rows for the cursor,
# like most drivers do.
kwargs['buffered'] = True

con = self.driver_module.connect(*args, **kwargs)

Expand All @@ -178,10 +183,14 @@ def connect(self, *args, **kwargs):
AbstractMySQLDriver.MY_TIMEZONE_STMT,
)

def cursor(self, conn):
cur = super(PyMySQLConnectorDriver, self).cursor(conn)
cur.connection = conn
return cur
def cursor(self, conn, server_side=False):
if server_side:
cursor = conn.cursor(buffered=False)
cursor.arraysize = self.cursor_arraysize
else:
cursor = super(PyMySQLConnectorDriver, self).cursor(conn, server_side=server_side)
cursor.connection = conn
return cursor

def set_autocommit(self, conn, value):
# This implementation uses a property instead of a method.
Expand Down

0 comments on commit b1a4c83

Please sign in to comment.