From 794eb8e3cf81dc061e2e47d1f379b39e2c9def76 Mon Sep 17 00:00:00 2001 From: Jason Madden Date: Sun, 22 Sep 2019 12:30:51 -0500 Subject: [PATCH] Make copying transactions from history-free RelStorage use much less memory. And be safer: it removes temp blobs when done with them. --- CHANGES.rst | 10 ++ src/relstorage/_compat.py | 14 +- src/relstorage/adapters/dbiter.py | 134 ++++++++++++------ src/relstorage/adapters/interfaces.py | 28 ++-- .../adapters/oracle/tests/test_dialect.py | 3 +- src/relstorage/storage/__init__.py | 13 +- src/relstorage/storage/copy.py | 57 ++++++-- src/relstorage/storage/history.py | 32 ++--- src/relstorage/storage/load.py | 6 +- .../storage/transaction_iterator.py | 25 ++-- 10 files changed, 219 insertions(+), 103 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 195e89be..8a505bea 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -51,6 +51,16 @@ using the storage or ``sync()`` is called). This prevents data loss in some cases. See :issue:`344`. +- Make copying transactions *from* a history-free RelStorage (e.g., with + ``zodbconvert``) require substantially less memory (75% less). + +- Make copying transactions *to* a RelStorage clean up temporary blob + files. + +- Make ``zodbconvert`` log progress at intervals instead of for every + transaction. Logging every transaction could add significant overhead + unless stdout was redirected to a file. + 3.0a10 (2019-09-04) =================== diff --git a/src/relstorage/_compat.py b/src/relstorage/_compat.py index 0ed42f94..4ff8f1f1 100644 --- a/src/relstorage/_compat.py +++ b/src/relstorage/_compat.py @@ -130,12 +130,12 @@ def OidSet_difference(c1, c2): OidSet_discard = set.discard -# Lists of OIDs. These could be simple list() objects, or we can treat -# them as numbers and store them in array.array objects, if we have an -# unsigned 64-bit element type. array.array, just like the C version -# of BTrees, uses less memory or CPython, but has a cost converting -# back and forth between objects and native values. What's the cost? -# Let's measure. +# Lists of OIDs or TIDs. These could be simple list() objects, or we +# can treat them as numbers and store them in array.array objects, if +# we have an unsigned 64-bit element type. array.array, just like the +# C version of BTrees, uses less memory or CPython, but has a cost +# converting back and forth between objects and native values. What's +# the cost? Let's measure. # # Test: list(xrange(30000000)) vs array.array('L', xrange(30000000)) # on Python 2, with minor modifications (range and 'Q') on Python 3. @@ -211,7 +211,7 @@ def OidSet_difference(c1, c2): OidList = _64bit_array else: OidList = list - +TidList = OidList MAX_TID = BTrees.family64.maxint def iteroiditems(d): diff --git a/src/relstorage/adapters/dbiter.py b/src/relstorage/adapters/dbiter.py index f2fb4da5..98f8e13c 100644 --- a/src/relstorage/adapters/dbiter.py +++ b/src/relstorage/adapters/dbiter.py @@ -14,9 +14,13 @@ from __future__ import absolute_import from __future__ import print_function +from collections import namedtuple + from zope.interface import implementer from relstorage._compat import MAX_TID +from relstorage._compat import TidList +from relstorage._util import Lazy from .interfaces import IDatabaseIterator from .schema import Schema @@ -33,6 +37,14 @@ def __init__(self, database_driver): """ self.driver = database_driver + @Lazy + def _as_state(self): + return self.driver.binary_column_as_state_type + + @Lazy + def _as_bytes(self): + return self.driver.binary_column_as_bytes + _iter_objects_query = Schema.object_state.select( it.c.zoid, it.c.state @@ -48,10 +60,21 @@ def iter_objects(self, cursor, tid): Yields ``(oid, state)`` for each object in the transaction. """ self._iter_objects_query.execute(cursor, {'tid': tid}) + as_state = self._as_state for oid, state in cursor: - state = self.driver.binary_column_as_state_type(state) + state = as_state(state) # pylint:disable=too-many-function-args yield oid, state +class _HistoryPreservingTransactionRecord(namedtuple( + '_HistoryPreservingTransactionRecord', + ('tid_int', 'username', 'description', 'extension', 'packed') +)): + __slots__ = () + + @property + def pickle_size(self): + return self.packed + @implementer(IDatabaseIterator) class HistoryPreservingDatabaseIterator(DatabaseIterator): @@ -62,8 +85,7 @@ def _transaction_iterator(self, cursor): """ Iterate over a list of transactions returned from the database. - Each row begins with ``(tid, username, description, extension)`` - and may have other columns. + Each row is ``(tid, username, description, extension, X)`` """ # Iterating the cursor itself in a generator is not safe if # the cursor doesn't actually buffer all the rows *anyway*. If @@ -71,19 +93,28 @@ def _transaction_iterator(self, cursor): # rows, a subsequent query or close operation can lead to # things like MySQL Connector/Python raising # InternalError(unread results) - rows = cursor.fetchall() - for row in rows: - tid, username, description, ext = row[:4] - # Although the transaction interface for username and description are - # defined as strings, this layer works with bytes. PY3. - username = self.driver.binary_column_as_bytes(username) - description = self.driver.binary_column_as_bytes(description) - ext = self.driver.binary_column_as_bytes(ext) - - yield (tid, username, description, ext) + tuple(row[4:]) + # Because we have it all in memory anyway, there's not much point in + # making this a generator. + + # Although the transaction interface for username and description are + # defined as strings, this layer works with bytes. The ZODB layer + # does the conversion. + as_bytes = self._as_bytes + # pylint:disable=too-many-function-args + return [ + _HistoryPreservingTransactionRecord( + tid, + as_bytes(username), + as_bytes(description), + as_bytes(ext), + packed + ) + for (tid, username, description, ext, packed) + in cursor + ] _iter_transactions_query = Schema.transaction.select( - it.c.tid, it.c.username, it.c.description, it.c.extension + it.c.tid, it.c.username, it.c.description, it.c.extension, 0 ).where( it.c.packed == False # pylint:disable=singleton-comparison ).and_( @@ -116,11 +147,8 @@ def iter_transactions(self, cursor): ) def iter_transactions_range(self, cursor, start=None, stop=None): - """Iterate over the transactions in the given range, oldest first. - - Includes packed transactions. - Yields (tid, username, description, extension, packed) - for each transaction. + """ + See `IDatabaseIterator`. """ params = { 'min_tid': start if start else 0, @@ -149,11 +177,9 @@ def iter_transactions_range(self, cursor, start=None, stop=None): ) def iter_object_history(self, cursor, oid): - """Iterate over an object's history. - + """ + See `IDatabaseIterator` Raises KeyError if the object does not exist. - Yields (tid, username, description, extension, pickle_size) - for each modification. """ params = {'oid': oid} self._object_exists_query.execute(cursor, params) @@ -163,6 +189,42 @@ def iter_object_history(self, cursor, oid): self._object_history_query.execute(cursor, params) return self._transaction_iterator(cursor) +class _HistoryFreeTransactionRecord(object): + __slots__ = ('tid_int',) + + username = b'' + description = b'' + extension = b'' + packed = True + + def __init__(self, tid): + self.tid_int = tid + + +class _HistoryFreeObjectHistoryRecord(_HistoryFreeTransactionRecord): + __slots__ = ('pickle_size',) + + def __init__(self, tid, size): + _HistoryFreeTransactionRecord.__init__(self, tid) + self.pickle_size = size + + +class _HistoryFreeTransactionRange(object): + # By storing just the int, and materializing the records on demand, we + # save substantial amounts of memory. For example, 18MM records on PyPy + # went from about 3.5GB to about 0.5GB + __slots__ = ( + 'tid_ints', + ) + + def __init__(self, tid_ints): + self.tid_ints = tid_ints + + def __len__(self): + return len(self.tid_ints) + + def __getitem__(self, ix): + return _HistoryFreeTransactionRecord(self.tid_ints[ix]) @implementer(IDatabaseIterator) class HistoryFreeDatabaseIterator(DatabaseIterator): @@ -170,15 +232,11 @@ class HistoryFreeDatabaseIterator(DatabaseIterator): keep_history = False def iter_transactions(self, cursor): - """Iterate over the transaction log, newest first. - - Skips packed transactions. - Yields ``(tid, username, description, extension)`` for each transaction. - + """ This always returns an empty iterable. """ # pylint:disable=unused-argument - return [] + return () _iter_transactions_range_query = Schema.object_state.select( it.c.tid, @@ -191,18 +249,15 @@ def iter_transactions(self, cursor): ).distinct() def iter_transactions_range(self, cursor, start=None, stop=None): - """Iterate over the transactions in the given range, oldest first. - - Includes packed transactions. - Yields ``(tid, username, description, extension, packed)`` - for each transaction. + """ + See `IDatabaseIterator`. """ params = { 'min_tid': start if start else 0, 'max_tid': stop if stop else MAX_TID } self._iter_transactions_range_query.execute(cursor, params) - return ((tid, b'', b'', b'', True) for (tid,) in cursor) + return _HistoryFreeTransactionRange(TidList((tid for (tid,) in cursor))) _iter_object_history_query = Schema.object_state.select( it.c.tid, it.c.state_size @@ -212,15 +267,14 @@ def iter_transactions_range(self, cursor, start=None, stop=None): def iter_object_history(self, cursor, oid): """ - Iterate over an object's history. + See `IDatabaseIterator` - Raises KeyError if the object does not exist. - Yields a single row, - ``(tid, username, description, extension, pickle_size)`` + Yields a single row. """ self._iter_object_history_query.execute(cursor, {'oid': oid}) rows = cursor.fetchall() if not rows: raise KeyError(oid) assert len(rows) == 1 - return [(tid, '', '', b'', size) for (tid, size) in rows] + tid, size = rows[0] + return [_HistoryFreeObjectHistoryRecord(tid, size)] diff --git a/src/relstorage/adapters/interfaces.py b/src/relstorage/adapters/interfaces.py index 2c0c43f4..2315431e 100644 --- a/src/relstorage/adapters/interfaces.py +++ b/src/relstorage/adapters/interfaces.py @@ -551,26 +551,34 @@ def iter_objects(cursor, tid): """ def iter_transactions(cursor): - """Iterate over the transaction log, newest first. + """ + Iterate over the transaction log, newest first. - Skips packed transactions. - Yields (tid, username, description, extension) for each transaction. + Skips packed transactions. Yields (tid, username, description, + extension) for each transaction. """ def iter_transactions_range(cursor, start=None, stop=None): - """Iterate over the transactions in the given range, oldest first. + """ + Return an indexable object over the transactions in the given range, oldest + first. Includes packed transactions. - Yields (tid, username, description, extension, packed) - for each transaction. + + Has an object with the properties ``tid_int``, ``username`` + (bytes) ``description`` (bytes) ``extension`` (bytes) and + ``packed`` (boolean) for each transaction. """ def iter_object_history(cursor, oid): - """Iterate over an object's history. + """ + Iterate over an object's history. + + Yields an object with the properties ``tid_int``, ``username`` + (bytes) ``description`` (bytes) ``extension`` (bytes) and + ``pickle_size`` (int) for each transaction. - Raises KeyError if the object does not exist. - Yields (tid, username, description, extension, state_size) - for each modification. + :raises KeyError: if the object does not exist """ diff --git a/src/relstorage/adapters/oracle/tests/test_dialect.py b/src/relstorage/adapters/oracle/tests/test_dialect.py index 8fe43559..e8f11753 100644 --- a/src/relstorage/adapters/oracle/tests/test_dialect.py +++ b/src/relstorage/adapters/oracle/tests/test_dialect.py @@ -22,6 +22,7 @@ class Context(object): class Driver(object): dialect = OracleDialect() + binary_column_as_state_type = binary_column_as_bytes = lambda b: b class TestOracleDialect(TestCase): @@ -170,7 +171,7 @@ def test_iter_transactions(self): self.assertEqual( stmt, - 'SELECT tid, username, description, extension ' + 'SELECT tid, username, description, extension, 0 ' 'FROM transaction ' "WHERE ((packed = 'N' AND tid <> :literal_0)) " 'ORDER BY tid DESC' diff --git a/src/relstorage/storage/__init__.py b/src/relstorage/storage/__init__.py index d0fbdd13..d49cd46a 100644 --- a/src/relstorage/storage/__init__.py +++ b/src/relstorage/storage/__init__.py @@ -423,8 +423,12 @@ def registerDB(self, wrapper): if hasattr(wrapper, 'base') and hasattr(wrapper, 'copied_methods'): type(wrapper).new_instance = _zlibstorage_new_instance type(wrapper).pack = _zlibstorage_pack - # NOTE that zlibstorage has a custom copyTransactionsFrom that hides - # our own implementation. + from zc.zlibstorage import _Iterator + _Iterator.__len__ = _zlibstorage_Iterator_len + # zc.zlibstorage has a custom copyTransactionsFrom that hides + # our own implementation. It just uses ZODb.blob.copyTransactionsFromTo. + # Use our implementation. + wrapper.copyTransactionsFrom = self.copyTransactionsFrom else: wrapper.new_instance = lambda s: type(wrapper)(self.new_instance()) @@ -535,7 +539,7 @@ def iterator(self, start=None, stop=None): if self.keep_history: return HistoryPreservingTransactionIterator(self._adapter, start, stop) return HistoryFreeTransactionIterator( - self._adapter, self._load_connection.cursor, start, stop) + self._adapter, self._load_connection, start, stop) def afterCompletion(self): @@ -835,3 +839,6 @@ def _zlibstorage_pack(self, pack_time, referencesf, *args, **kwargs): def refs(state, oids=None): return referencesf(untransform(state), oids) return self.base.pack(pack_time, refs, *args, **kwargs) + +def _zlibstorage_Iterator_len(self): + return len(self._base_it) diff --git a/src/relstorage/storage/copy.py b/src/relstorage/storage/copy.py index 110d2326..40969638 100644 --- a/src/relstorage/storage/copy.py +++ b/src/relstorage/storage/copy.py @@ -45,10 +45,17 @@ def __init__(self, blobhelper, tpc, restore): self.tpc = tpc self.restore = restore + # Time in seconds between progress logging. + log_interval = 60 + + # Number of transactions to copy before checking if we should log. + log_count = 10 + def copyTransactionsFrom(self, other): - # pylint:disable=too-many-locals,too-many-statements + # pylint:disable=too-many-locals,too-many-statements,too-many-branches # adapted from ZODB.blob.BlobStorageMixin begin_time = time.time() + log_at = begin_time + self.log_interval txnum = 0 total_size = 0 blobhelper = self.blobhelper @@ -57,6 +64,7 @@ def copyTransactionsFrom(self, other): logger.info("Counting the transactions to copy.") other_it = other.iterator() + logger.debug("Opened the other iterator: %s", other_it) try: num_txns = len(other_it) if num_txns == 0: @@ -64,6 +72,7 @@ def copyTransactionsFrom(self, other): # Try the other path. raise TypeError() except TypeError: + logger.debug("Iterator %s doesn't support len()", other_it) num_txns = 0 for _ in other_it: num_txns += 1 @@ -71,6 +80,7 @@ def copyTransactionsFrom(self, other): other_it = other.iterator() logger.info("Copying %d transactions", num_txns) + tmp_blobs_to_rm = [] for trans in other_it: txnum += 1 num_txn_records = 0 @@ -87,9 +97,13 @@ def copyTransactionsFrom(self, other): if blobfile is not None: fd, name = tempfile.mkstemp( suffix='.tmp', - dir=blobhelper.temporaryDirectory()) - os.close(fd) - with open(name, 'wb') as target: + dir=blobhelper.temporaryDirectory() + ) + tmp_blobs_to_rm.append(name) + logger.debug("Copying %s to temporary blob file %s for upload", + blobfile, name) + + with os.fdopen(fd, 'wb') as target: copy_blob(blobfile, target) blobfile.close() restore.restoreBlob(record.oid, record.tid, record.data, @@ -103,17 +117,30 @@ def copyTransactionsFrom(self, other): tpc.tpc_vote(trans) tpc.tpc_finish(trans) - pct_complete = '%1.2f%%' % (txnum * 100.0 / num_txns) - elapsed = time.time() - begin_time - if elapsed: - rate = total_size / 1e6 / elapsed - else: - rate = 0.0 - rate_str = '%1.3f' % rate - logger.info( - "Copied tid %d,%5d records | %6s MB/s (%6d/%6d,%7s)", - bytes8_to_int64(trans.tid), num_txn_records, rate_str, - txnum, num_txns, pct_complete) + for tmp_blob in tmp_blobs_to_rm: + logger.debug("Removing temporary blob file %s", tmp_blob) + try: + os.unlink(tmp_blob) + except OSError: + pass + del tmp_blobs_to_rm[:] + + if txnum % self.log_count == 0 and time.time() > log_at: + now = time.time() + log_at = now + self.log_interval + + pct_complete = '%1.2f%%' % (txnum * 100.0 / num_txns) + elapsed = now - begin_time + if elapsed: + rate = total_size / 1e6 / elapsed + else: + rate = 0.0 + rate_str = '%1.3f' % rate + + logger.info( + "Copied tid %d,%5d records | %6s MB/s (%6d/%6d,%7s)", + bytes8_to_int64(trans.tid), num_txn_records, rate_str, + txnum, num_txns, pct_complete) elapsed = time.time() - begin_time logger.info( diff --git a/src/relstorage/storage/history.py b/src/relstorage/storage/history.py index 38f6f44a..583e7220 100644 --- a/src/relstorage/storage/history.py +++ b/src/relstorage/storage/history.py @@ -61,26 +61,26 @@ def history(self, oid, version=None, size=1, filter=None): cursor = self.load_connection.cursor oid_int = bytes8_to_int64(oid) try: - rows = self.adapter.dbiter.iter_object_history( + history = self.adapter.dbiter.iter_object_history( cursor, oid_int) except KeyError: raise POSKeyError(oid) res = [] - for tid_int, username, description, extension, length in rows: - tid = int64_to_8bytes(tid_int) - if extension: - d = loads(extension) + for entry in history: + tid = int64_to_8bytes(entry.tid_int) + if entry.extension: + d = loads(entry.extension) else: d = {} d.update({ "time": TimeStamp(tid).timeTime(), - "user_name": username or b'', - "description": description or b'', + "user_name": entry.username or b'', + "description": entry.description or b'', "tid": tid, "version": '', - "size": length, - "rs_tid_int": tid_int, + "size": entry.pickle_size, + "rs_tid_int": entry.tid_int, "rs_oid_int": oid_int, }) if filter is None or filter(d): @@ -123,11 +123,11 @@ def undoLog(self, first=0, last=-20, filter=None): adapter = self.adapter conn, cursor = adapter.connmanager.open() try: - rows = adapter.dbiter.iter_transactions(cursor) + tx_iter = adapter.dbiter.iter_transactions(cursor) i = 0 res = [] - for tid_int, user, desc, ext in rows: - tid = int64_to_8bytes(tid_int) + for tx in tx_iter: + tid = int64_to_8bytes(tx.tid_int) # Note that user and desc are schizophrenic. The transaction # interface specifies that they are a Python str, *probably* # meaning bytes. But code in the wild and the ZODB test suite @@ -138,11 +138,11 @@ def undoLog(self, first=0, last=-20, filter=None): d = { 'id': base64_encodebytes(tid)[:-1], # pylint:disable=deprecated-method 'time': TimeStamp(tid).timeTime(), - 'user_name': user or b'', - 'description': desc or b'', + 'user_name': tx.username or b'', + 'description': tx.description or b'', } - if ext: - d.update(loads(ext)) + if tx.extension: + d.update(loads(tx.extension)) if filter is None or filter(d): if i >= first: diff --git a/src/relstorage/storage/load.py b/src/relstorage/storage/load.py index 61a7723f..f38fa68b 100644 --- a/src/relstorage/storage/load.py +++ b/src/relstorage/storage/load.py @@ -50,15 +50,15 @@ def _make_pke_data(cursor, adapter, oid_int, reason): tids = [] try: - rows = adapter.dbiter.iter_object_history(cursor, oid_int) + history = adapter.dbiter.iter_object_history(cursor, oid_int) except KeyError as e: # The object has no history, at least from the point of view # of the current database load connection. tids = str(e) del e else: - for row in rows: - tids.append(row[0]) + for entry in history: + tids.append(entry.tid_int) if len(tids) >= 10: break extra['recent_tids'] = tids diff --git a/src/relstorage/storage/transaction_iterator.py b/src/relstorage/storage/transaction_iterator.py index 1b648699..93e10ce9 100644 --- a/src/relstorage/storage/transaction_iterator.py +++ b/src/relstorage/storage/transaction_iterator.py @@ -27,6 +27,7 @@ from ZODB.utils import u64 as bytes8_to_int64 from relstorage._compat import loads +from relstorage.adapters.connections import LoadConnection logger = __import__('logging').getLogger(__name__) @@ -44,9 +45,9 @@ class _TransactionIterator(object): '_index', ) - def __init__(self, adapter, cursor, start, stop): + def __init__(self, adapter, load_connection, start, stop): self._adapter = adapter - self._cursor = cursor + self._cursor = load_connection.cursor self._closed = False if start is not None: @@ -59,8 +60,9 @@ def __init__(self, adapter, cursor, start, stop): stop_int = None # _transactions: [(tid, username, description, extension, packed)] - self._transactions = list(adapter.dbiter.iter_transactions_range( - self._cursor, start_int, stop_int)) + with load_connection.server_side_cursor() as cursor: + self._transactions = adapter.dbiter.iter_transactions_range( + cursor, start_int, stop_int) self._index = 0 def close(self): @@ -93,7 +95,14 @@ def next(self): if self._closed: raise IOError("TransactionIterator already closed") params = self._transactions[self._index] - res = RelStorageTransactionRecord(self, *params) + res = RelStorageTransactionRecord( + self, + params.tid_int, + params.username, + params.description, + params.extension, + params.packed + ) self._index += 1 return res @@ -111,14 +120,14 @@ class HistoryPreservingTransactionIterator(_TransactionIterator): ) def __init__(self, adapter, start, stop): - self._conn, cursor = adapter.connmanager.open_for_load() + self._conn = load_connection = LoadConnection(adapter.connmanager) super(HistoryPreservingTransactionIterator, self).__init__( - adapter, cursor, start, stop) + adapter, load_connection, start, stop) def close(self): try: if self._conn is not None: - self._adapter.connmanager.close(self._conn, self._cursor) + self._conn.drop() finally: self._conn = None super(HistoryPreservingTransactionIterator, self).close()