Skip to content

Commit

Permalink
Make copying transactions from history-free RelStorage use much less …
Browse files Browse the repository at this point in the history
…memory.

And be safer: it removes temp blobs when done with them.
  • Loading branch information
jamadden committed Sep 22, 2019
1 parent 47c4b13 commit 794eb8e
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 103 deletions.
10 changes: 10 additions & 0 deletions CHANGES.rst
Expand Up @@ -51,6 +51,16 @@
using the storage or ``sync()`` is called). This prevents data loss
in some cases. See :issue:`344`.

- Make copying transactions *from* a history-free RelStorage (e.g., with
``zodbconvert``) require substantially less memory (75% less).

- Make copying transactions *to* a RelStorage clean up temporary blob
files.

- Make ``zodbconvert`` log progress at intervals instead of for every
transaction. Logging every transaction could add significant overhead
unless stdout was redirected to a file.

3.0a10 (2019-09-04)
===================

Expand Down
14 changes: 7 additions & 7 deletions src/relstorage/_compat.py
Expand Up @@ -130,12 +130,12 @@ def OidSet_difference(c1, c2):

OidSet_discard = set.discard

# Lists of OIDs. These could be simple list() objects, or we can treat
# them as numbers and store them in array.array objects, if we have an
# unsigned 64-bit element type. array.array, just like the C version
# of BTrees, uses less memory or CPython, but has a cost converting
# back and forth between objects and native values. What's the cost?
# Let's measure.
# Lists of OIDs or TIDs. These could be simple list() objects, or we
# can treat them as numbers and store them in array.array objects, if
# we have an unsigned 64-bit element type. array.array, just like the
# C version of BTrees, uses less memory or CPython, but has a cost
# converting back and forth between objects and native values. What's
# the cost? Let's measure.
#
# Test: list(xrange(30000000)) vs array.array('L', xrange(30000000))
# on Python 2, with minor modifications (range and 'Q') on Python 3.
Expand Down Expand Up @@ -211,7 +211,7 @@ def OidSet_difference(c1, c2):
OidList = _64bit_array
else:
OidList = list

TidList = OidList
MAX_TID = BTrees.family64.maxint

def iteroiditems(d):
Expand Down
134 changes: 94 additions & 40 deletions src/relstorage/adapters/dbiter.py
Expand Up @@ -14,9 +14,13 @@
from __future__ import absolute_import
from __future__ import print_function

from collections import namedtuple

from zope.interface import implementer

from relstorage._compat import MAX_TID
from relstorage._compat import TidList
from relstorage._util import Lazy

from .interfaces import IDatabaseIterator
from .schema import Schema
Expand All @@ -33,6 +37,14 @@ def __init__(self, database_driver):
"""
self.driver = database_driver

@Lazy
def _as_state(self):
return self.driver.binary_column_as_state_type

@Lazy
def _as_bytes(self):
return self.driver.binary_column_as_bytes

_iter_objects_query = Schema.object_state.select(
it.c.zoid,
it.c.state
Expand All @@ -48,10 +60,21 @@ def iter_objects(self, cursor, tid):
Yields ``(oid, state)`` for each object in the transaction.
"""
self._iter_objects_query.execute(cursor, {'tid': tid})
as_state = self._as_state
for oid, state in cursor:
state = self.driver.binary_column_as_state_type(state)
state = as_state(state) # pylint:disable=too-many-function-args
yield oid, state

class _HistoryPreservingTransactionRecord(namedtuple(
'_HistoryPreservingTransactionRecord',
('tid_int', 'username', 'description', 'extension', 'packed')
)):
__slots__ = ()

@property
def pickle_size(self):
return self.packed


@implementer(IDatabaseIterator)
class HistoryPreservingDatabaseIterator(DatabaseIterator):
Expand All @@ -62,28 +85,36 @@ def _transaction_iterator(self, cursor):
"""
Iterate over a list of transactions returned from the database.
Each row begins with ``(tid, username, description, extension)``
and may have other columns.
Each row is ``(tid, username, description, extension, X)``
"""
# Iterating the cursor itself in a generator is not safe if
# the cursor doesn't actually buffer all the rows *anyway*. If
# we break from the iterating loop before exhausting all the
# rows, a subsequent query or close operation can lead to
# things like MySQL Connector/Python raising
# InternalError(unread results)
rows = cursor.fetchall()
for row in rows:
tid, username, description, ext = row[:4]
# Although the transaction interface for username and description are
# defined as strings, this layer works with bytes. PY3.
username = self.driver.binary_column_as_bytes(username)
description = self.driver.binary_column_as_bytes(description)
ext = self.driver.binary_column_as_bytes(ext)

yield (tid, username, description, ext) + tuple(row[4:])
# Because we have it all in memory anyway, there's not much point in
# making this a generator.

# Although the transaction interface for username and description are
# defined as strings, this layer works with bytes. The ZODB layer
# does the conversion.
as_bytes = self._as_bytes
# pylint:disable=too-many-function-args
return [
_HistoryPreservingTransactionRecord(
tid,
as_bytes(username),
as_bytes(description),
as_bytes(ext),
packed
)
for (tid, username, description, ext, packed)
in cursor
]

_iter_transactions_query = Schema.transaction.select(
it.c.tid, it.c.username, it.c.description, it.c.extension
it.c.tid, it.c.username, it.c.description, it.c.extension, 0
).where(
it.c.packed == False # pylint:disable=singleton-comparison
).and_(
Expand Down Expand Up @@ -116,11 +147,8 @@ def iter_transactions(self, cursor):
)

def iter_transactions_range(self, cursor, start=None, stop=None):
"""Iterate over the transactions in the given range, oldest first.
Includes packed transactions.
Yields (tid, username, description, extension, packed)
for each transaction.
"""
See `IDatabaseIterator`.
"""
params = {
'min_tid': start if start else 0,
Expand Down Expand Up @@ -149,11 +177,9 @@ def iter_transactions_range(self, cursor, start=None, stop=None):
)

def iter_object_history(self, cursor, oid):
"""Iterate over an object's history.
"""
See `IDatabaseIterator`
Raises KeyError if the object does not exist.
Yields (tid, username, description, extension, pickle_size)
for each modification.
"""
params = {'oid': oid}
self._object_exists_query.execute(cursor, params)
Expand All @@ -163,22 +189,54 @@ def iter_object_history(self, cursor, oid):
self._object_history_query.execute(cursor, params)
return self._transaction_iterator(cursor)

class _HistoryFreeTransactionRecord(object):
__slots__ = ('tid_int',)

username = b''
description = b''
extension = b''
packed = True

def __init__(self, tid):
self.tid_int = tid


class _HistoryFreeObjectHistoryRecord(_HistoryFreeTransactionRecord):
__slots__ = ('pickle_size',)

def __init__(self, tid, size):
_HistoryFreeTransactionRecord.__init__(self, tid)
self.pickle_size = size


class _HistoryFreeTransactionRange(object):
# By storing just the int, and materializing the records on demand, we
# save substantial amounts of memory. For example, 18MM records on PyPy
# went from about 3.5GB to about 0.5GB
__slots__ = (
'tid_ints',
)

def __init__(self, tid_ints):
self.tid_ints = tid_ints

def __len__(self):
return len(self.tid_ints)

def __getitem__(self, ix):
return _HistoryFreeTransactionRecord(self.tid_ints[ix])

@implementer(IDatabaseIterator)
class HistoryFreeDatabaseIterator(DatabaseIterator):

keep_history = False

def iter_transactions(self, cursor):
"""Iterate over the transaction log, newest first.
Skips packed transactions.
Yields ``(tid, username, description, extension)`` for each transaction.
"""
This always returns an empty iterable.
"""
# pylint:disable=unused-argument
return []
return ()

_iter_transactions_range_query = Schema.object_state.select(
it.c.tid,
Expand All @@ -191,18 +249,15 @@ def iter_transactions(self, cursor):
).distinct()

def iter_transactions_range(self, cursor, start=None, stop=None):
"""Iterate over the transactions in the given range, oldest first.
Includes packed transactions.
Yields ``(tid, username, description, extension, packed)``
for each transaction.
"""
See `IDatabaseIterator`.
"""
params = {
'min_tid': start if start else 0,
'max_tid': stop if stop else MAX_TID
}
self._iter_transactions_range_query.execute(cursor, params)
return ((tid, b'', b'', b'', True) for (tid,) in cursor)
return _HistoryFreeTransactionRange(TidList((tid for (tid,) in cursor)))

_iter_object_history_query = Schema.object_state.select(
it.c.tid, it.c.state_size
Expand All @@ -212,15 +267,14 @@ def iter_transactions_range(self, cursor, start=None, stop=None):

def iter_object_history(self, cursor, oid):
"""
Iterate over an object's history.
See `IDatabaseIterator`
Raises KeyError if the object does not exist.
Yields a single row,
``(tid, username, description, extension, pickle_size)``
Yields a single row.
"""
self._iter_object_history_query.execute(cursor, {'oid': oid})
rows = cursor.fetchall()
if not rows:
raise KeyError(oid)
assert len(rows) == 1
return [(tid, '', '', b'', size) for (tid, size) in rows]
tid, size = rows[0]
return [_HistoryFreeObjectHistoryRecord(tid, size)]
28 changes: 18 additions & 10 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -551,26 +551,34 @@ def iter_objects(cursor, tid):
"""

def iter_transactions(cursor):
"""Iterate over the transaction log, newest first.
"""
Iterate over the transaction log, newest first.
Skips packed transactions.
Yields (tid, username, description, extension) for each transaction.
Skips packed transactions. Yields (tid, username, description,
extension) for each transaction.
"""

def iter_transactions_range(cursor, start=None, stop=None):
"""Iterate over the transactions in the given range, oldest first.
"""
Return an indexable object over the transactions in the given range, oldest
first.
Includes packed transactions.
Yields (tid, username, description, extension, packed)
for each transaction.
Has an object with the properties ``tid_int``, ``username``
(bytes) ``description`` (bytes) ``extension`` (bytes) and
``packed`` (boolean) for each transaction.
"""

def iter_object_history(cursor, oid):
"""Iterate over an object's history.
"""
Iterate over an object's history.
Yields an object with the properties ``tid_int``, ``username``
(bytes) ``description`` (bytes) ``extension`` (bytes) and
``pickle_size`` (int) for each transaction.
Raises KeyError if the object does not exist.
Yields (tid, username, description, extension, state_size)
for each modification.
:raises KeyError: if the object does not exist
"""


Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/oracle/tests/test_dialect.py
Expand Up @@ -22,6 +22,7 @@ class Context(object):

class Driver(object):
dialect = OracleDialect()
binary_column_as_state_type = binary_column_as_bytes = lambda b: b

class TestOracleDialect(TestCase):

Expand Down Expand Up @@ -170,7 +171,7 @@ def test_iter_transactions(self):

self.assertEqual(
stmt,
'SELECT tid, username, description, extension '
'SELECT tid, username, description, extension, 0 '
'FROM transaction '
"WHERE ((packed = 'N' AND tid <> :literal_0)) "
'ORDER BY tid DESC'
Expand Down
13 changes: 10 additions & 3 deletions src/relstorage/storage/__init__.py
Expand Up @@ -423,8 +423,12 @@ def registerDB(self, wrapper):
if hasattr(wrapper, 'base') and hasattr(wrapper, 'copied_methods'):
type(wrapper).new_instance = _zlibstorage_new_instance
type(wrapper).pack = _zlibstorage_pack
# NOTE that zlibstorage has a custom copyTransactionsFrom that hides
# our own implementation.
from zc.zlibstorage import _Iterator
_Iterator.__len__ = _zlibstorage_Iterator_len
# zc.zlibstorage has a custom copyTransactionsFrom that hides
# our own implementation. It just uses ZODb.blob.copyTransactionsFromTo.
# Use our implementation.
wrapper.copyTransactionsFrom = self.copyTransactionsFrom
else:
wrapper.new_instance = lambda s: type(wrapper)(self.new_instance())

Expand Down Expand Up @@ -535,7 +539,7 @@ def iterator(self, start=None, stop=None):
if self.keep_history:
return HistoryPreservingTransactionIterator(self._adapter, start, stop)
return HistoryFreeTransactionIterator(
self._adapter, self._load_connection.cursor, start, stop)
self._adapter, self._load_connection, start, stop)


def afterCompletion(self):
Expand Down Expand Up @@ -835,3 +839,6 @@ def _zlibstorage_pack(self, pack_time, referencesf, *args, **kwargs):
def refs(state, oids=None):
return referencesf(untransform(state), oids)
return self.base.pack(pack_time, refs, *args, **kwargs)

def _zlibstorage_Iterator_len(self):
return len(self._base_it)

0 comments on commit 794eb8e

Please sign in to comment.