Skip to content

Commit

Permalink
Merge 74c7288 into 3e50d8b
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 29, 2020
2 parents 3e50d8b + 74c7288 commit fc51f56
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 4 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -60,6 +60,7 @@ script:
- pip uninstall -y zope.schema && python -c 'import relstorage.interfaces, relstorage.adapters.interfaces, relstorage.cache.interfaces'
after_success:
- coverage combine
- coverage report -i --skip-covered
- coveralls
notifications:
email: false
Expand Down
7 changes: 6 additions & 1 deletion CHANGES.rst
Expand Up @@ -5,7 +5,12 @@
3.2.2 (unreleased)
==================

- Nothing changed yet.
- The "MySQLdb" driver didn't properly use server-side cursors when
requested. This would result in unexpected increased memory usage
for things like packing and storage iteration.

- Make RelStorage instances implement
``IStorageCurrentRecordIteration``. See :issue:`389`.


3.2.1 (2020-08-28)
Expand Down
30 changes: 30 additions & 0 deletions src/relstorage/adapters/dbiter.py
Expand Up @@ -18,6 +18,8 @@

from zope.interface import implementer

from ZODB.utils import p64 as int64_to_8bytes

from relstorage._compat import TidList
from relstorage._util import Lazy

Expand Down Expand Up @@ -65,6 +67,34 @@ def iter_objects(self, cursor, tid):
state = as_state(state) # pylint:disable=too-many-function-args
yield oid, state

_iter_current_records = Schema.all_current_object_state.select(
it.c.zoid,
it.c.tid,
it.c.state
).where(
it.c.zoid >= it.bindparam('start_oid')
).order_by(
it.c.zoid
)

def iter_current_records(self, cursor, start_oid_int=0):
"""
Cause the *cursor* (which should be a server-side cursor)
to execute a query that will iterate over
``(oid_int, tid_int, state_bytes)`` values for all the current objects.
Each current object is returned only once, at the transaction most recently
committed for it.
Returns a generator.
"""
self._iter_current_records.execute(cursor, {'start_oid': start_oid_int})
i_b = int64_to_8bytes
s = self._as_state
for oid_int, tid_int, state_bytes in cursor:
yield i_b(oid_int), i_b(tid_int), s(state_bytes) # pylint:disable=too-many-function-args


class _HistoryPreservingTransactionRecord(namedtuple(
'_HistoryPreservingTransactionRecord',
('tid_int', 'username', 'description', 'extension', 'packed')
Expand Down
15 changes: 15 additions & 0 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -668,6 +668,21 @@ def iter_object_history(cursor, oid):
:raises KeyError: if the object does not exist
"""

def iter_current_records(cursor, start_oid_int=0):
"""
Cause the *cursor* (which should be a server-side cursor)
to execute a query that will iterate over
``(oid_int, tid_int, state_bytes)`` values for all the current objects.
Each current object is returned only once, at the transaction most recently
committed for it.
Returns a generator.
For compatibility with FileStorage, this must iterate in ascending
OID order; it must also accept an OID to begin with for compatibility with
zodbupdate.
"""

class ILocker(Interface):
"""Acquire and release the commit and pack locks."""
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/mysql/drivers/__init__.py
Expand Up @@ -149,8 +149,9 @@ def __init__(self):

_server_side_cursor = None

def _make_cursor(self, conn, server_side=False):
def cursor(self, conn, server_side=False):
if server_side:
assert self._server_side_cursor is not None
cursor = conn.cursor(self._server_side_cursor)
cursor.arraysize = self.cursor_arraysize
else:
Expand Down
1 change: 1 addition & 0 deletions src/relstorage/interfaces.py
Expand Up @@ -227,6 +227,7 @@ class IRelStorage(
ZODB.interfaces.IMultiCommitStorage, # mandatory in ZODB5, returns tid from tpc_finish.
ZODB.interfaces.IStorageRestoreable, # tpc_begin(tid=) and restore()
ZODB.interfaces.IStorageIteration, # iterator()
ZODB.interfaces.IStorageCurrentRecordIteration, # record_iternext()
ZODB.interfaces.ReadVerifyingStorage, # checkCurrentSerialInTransaction()
IMVCCDatabaseViewer,
):
Expand Down
68 changes: 68 additions & 0 deletions src/relstorage/storage/__init__.py
Expand Up @@ -543,6 +543,74 @@ def iterator(self, start=None, stop=None):
return HistoryFreeTransactionIterator(
self._adapter, self._load_connection, start, stop)

__next = next

def __record_iternext_gen(self, start_oid_int):
with self._load_connection.server_side_cursor() as ss_cursor:
for record in self._adapter.dbiter.iter_current_records(ss_cursor, start_oid_int):
yield record

def record_iternext(self, next=None):
"""
Implementation of `ZODB.interfaces.IStorageCurrentRecordIteration`.
.. caution::
You must completely consume the iteration.
"""
# The interface doesn't define any semantics for *next*; it's
# completely up to the storage and is just a magic cookie.
# However, zodbupdate assumes that the initial value for
# `next` is interpreted like FileStorage does: as an OID (8
# bytes) and uses that to specify an object id to start
# with. (inclusive)
#
# We need to detect the case that we're given an OID; that should only happen
# in the first call.
start_oid_int = 0
if isinstance(next, bytes):
import warnings
warnings.warn(
"There is no defined value for the *next* parameter. "
"RelStorage will rely on implementation-defined behaviour and treat it like "
"FileStorage does, while assuming that iteration is beginning from the start. "
"This may change in the future."
)
start_oid_int = bytes8_to_int64(next)
next = None

if next is None:
# Beginning.
cursor = self.__record_iternext_gen(start_oid_int)
# There *should* always be at least one object: the root.
# But if they passed a non-0 ``next`` value, that's not guaranteed.
# However, the protocol is not very good at expressing that possibility:
# The typical call looks like:
# oid, tid, data, next = storage.record_iternext(next)
# Unpacking will raise if we return None; most processing of *data* is not expecting
# None (it's common to wrap it in ``io.BytesIO()`` immediately; that's what zodbupdate
# does). Thus, it's probably probably best to let getting the first value simply raise.
oid, tid, state = self.__next(cursor)
# After that, we can treat it like it was given to us in *next*
next = oid, tid, state, cursor


# Somewhere in the middle, possibly the end.
#
# We have to operate one ahead (keep one buffered) because
# we need to be able to tell the caller not to call back.
oid, tid, state, cursor = next

try:
new_oid, new_tid, new_state = self.__next(cursor)
except StopIteration:
# Signal not to call back.
new_next = None
else:
new_next = new_oid, new_tid, new_state, cursor

return oid, tid, state, new_next


def afterCompletion(self):
# Note that this method exists mainly to deal with read-only
Expand Down
5 changes: 3 additions & 2 deletions src/relstorage/storage/transaction_iterator.py
Expand Up @@ -132,6 +132,7 @@ def close(self):
self._conn = None
super(HistoryPreservingTransactionIterator, self).close()


class HistoryFreeTransactionIterator(_TransactionIterator):
"""
Uses the given load connection cursor. Does not close the
Expand Down Expand Up @@ -211,7 +212,7 @@ def __init__(self, trans_iter, tid_int, user, desc, ext, packed):
TransactionRecord.__init__(self, tid, status, user, description, extension)

def __iter__(self):
return RecordIterator(self)
return TransactionRecordIterator(self)

def __repr__(self):
return '<%s at %x tid=%d status=%r user=%r description=%r>' % (
Expand All @@ -223,7 +224,7 @@ def __repr__(self):
self.description
)

class RecordIterator(object):
class TransactionRecordIterator(object):
"""Iterate over the objects in a transaction."""

__slots__ = (
Expand Down
61 changes: 61 additions & 0 deletions src/relstorage/tests/reltestbase.py
Expand Up @@ -43,6 +43,7 @@
from ZODB.serialize import referencesf
from ZODB.utils import z64
from ZODB.utils import u64 as bytes8_to_int64
from ZODB.utils import p64 as int64_to_8bytes

from ZODB.tests import BasicStorage
from ZODB.tests import ConflictResolution
Expand Down Expand Up @@ -1385,6 +1386,66 @@ def checkCanLoadObjectStateWhileBeingModified(self):
storage1.close()
storage2.close()

###
# IStorageCurrentRecordIteration tests
###

def check_record_iternext_basic(self, start_oid_int=None):
# Based on code from FileStorage tests

db = DB(self._storage)
conn = db.open()
conn.root()['abc'] = MinPO('abc')
conn.root()['xyz'] = MinPO('xyz')
transaction.commit()

# Now, add some additional revisions. This proves that we iterate latest reconds,
# not all transactions.
conn.root()['abc'].value = 'def'
conn.root()['xyz'].value = 'ghi'
transaction.commit()
conn.close()

storage2 = self._closing(self._storage.new_instance())

# The special case: convert to byte OID
token = None if start_oid_int is None else int64_to_8bytes(start_oid_int)

# (0, 1, 2) by default, or, e.g, (1, 2)
expected_oids = range(start_oid_int or 0, 3)
if not expected_oids:
assert start_oid_int > 3
# Call at least once.
expected_oids = (0,)
record_count = 0
for x in expected_oids:
oid, tid, data, next_token = self._storage.record_iternext(token)
record_count += 1
self.assertEqual(oid, int64_to_8bytes(x))
token = next_token
expected_data, expected_tid = storage2.load(oid)
self.assertEqual(expected_data, data)
self.assertEqual(expected_tid, tid)
if x == 2:
check_token = self.assertIsNone
else:
check_token = self.assertIsNotNone
check_token(token)
self.assertEqual(len(expected_oids), record_count)

def check_record_iternext_token_0(self):
# Passing a starting token.
self.check_record_iternext_basic(0)

def check_record_iternext_token_1(self):
# Gets a subset.
self.check_record_iternext_basic(1)

def check_record_iternext_too_large_oid(self):
with self.assertRaises(StopIteration):
self.check_record_iternext_basic(10)


class AbstractRSZodbConvertTests(StorageCreatingMixin,
FSZODBConvertTests,
# This one isn't cooperative in
Expand Down

0 comments on commit fc51f56

Please sign in to comment.