diff --git a/CHANGES.rst b/CHANGES.rst index a5fbc524..c53ca62f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,7 +5,12 @@ 3.0a4 (unreleased) ================== -- Nothing changed yet. +- Add support for the ZODB 5 ``connection.prefetch(*args)`` API. This + takes either OIDs (``obj._p_oid``) or persistent ghost objects, or + an iterator of those things, and asks the storage to load them into + its cache for use in the future. In RelStorage, this uses the shared + cache and so may be useful for more than one thread. This can be + 3x or more faster than loading objects on-demand. See :issue:`239`. 3.0a3 (2019-06-26) diff --git a/src/relstorage/adapters/batch.py b/src/relstorage/adapters/batch.py index 8fabb3f6..6ed69fff 100644 --- a/src/relstorage/adapters/batch.py +++ b/src/relstorage/adapters/batch.py @@ -86,21 +86,19 @@ def select_from(self, columns, table, **kw): The keyword arguments should be of length 1, containing an iterable of the values to check. - Returns a list of matching rows. + Returns a iterator of matching rows. """ assert len(kw) == 1 filter_column, filter_values = kw.popitem() filter_values = list(filter_values) - results = [] command = 'SELECT %s' % (','.join(columns),) while filter_values: filter_subset = filter_values[:self.row_limit] del filter_values[:self.row_limit] descriptor = [[(table, (filter_column,)), filter_subset]] self._do_batch(command, descriptor, rows_need_flattened=False) - results.extend(self.cursor.fetchall()) - return results - + for row in self.cursor.fetchall(): + yield row def flush(self): if self.deletes: diff --git a/src/relstorage/adapters/interfaces.py b/src/relstorage/adapters/interfaces.py index cd214000..4a49cc1a 100644 --- a/src/relstorage/adapters/interfaces.py +++ b/src/relstorage/adapters/interfaces.py @@ -353,9 +353,20 @@ class IObjectMover(Interface): """Move object states to/from the database and within the database.""" def load_current(cursor, oid): - """Returns the current state and integer tid for an object. + """ + Returns the current state and integer tid for an object. + + *oid* is an integer. Returns (None, None) if object does not + exist. + """ + + def load_currents(cursor, oids): + """ + Returns the oid integer, state, and integer tid for all the specified + objects. - oid is an integer. Returns (None, None) if object does not exist. + *oids* is an iterable of integers. If any objects do no exist, + they are ignored. """ def load_revision(cursor, oid, tid): diff --git a/src/relstorage/adapters/mover.py b/src/relstorage/adapters/mover.py index 73fe3dfb..2c4fb386 100644 --- a/src/relstorage/adapters/mover.py +++ b/src/relstorage/adapters/mover.py @@ -59,7 +59,7 @@ def _compute_md5sum(self, data): """ SELECT state, tid FROM current_object - JOIN object_state USING(zoid, tid) + JOIN object_state USING(zoid, tid) WHERE zoid = %s """, """ @@ -93,6 +93,24 @@ def load_current(self, cursor, oid): return None, None + _load_currents_queries = ( + (('zoid', 'state', 'tid'), 'current_object JOIN object_state USING(zoid, tid)', 'zoid'), + (('zoid', 'state', 'tid'), 'object_state', 'zoid'), + ) + + _load_currents_query = _query_property('_load_currents') + + @metricmethod_sampled + def load_currents(self, cursor, oids): + """Returns the current {oid: tid} for specified object ids.""" + columns, table, filter_column = self._load_currents_query + binary_column_as_state_type = self.driver.binary_column_as_state_type + batcher = self.make_batcher(cursor, row_limit=1000) + rows = batcher.select_from(columns, table, **{filter_column: oids}) + for row in rows: + oid, state, tid = row + yield oid, binary_column_as_state_type(state), tid + _load_revision_query = """ SELECT state FROM object_state @@ -189,7 +207,7 @@ def current_object_tids(self, cursor, oids): columns, table, filter_column = self._current_object_tids_query batcher = self.make_batcher(cursor, row_limit=1000) rows = batcher.select_from(columns, table, **{filter_column: oids}) - res = self._current_object_tids_map_type(rows) + res = self._current_object_tids_map_type(list(rows)) return res #: A sequence of *names* of attributes on this object that are statements to be diff --git a/src/relstorage/adapters/postgresql/tests/test_mover.py b/src/relstorage/adapters/postgresql/tests/test_mover.py index 0f9dc42c..be930cc0 100644 --- a/src/relstorage/adapters/postgresql/tests/test_mover.py +++ b/src/relstorage/adapters/postgresql/tests/test_mover.py @@ -72,7 +72,7 @@ def test_prepare_load_current(self): [ 'PREPARE load_current (BIGINT) AS SELECT state, tid\n' ' FROM current_object\n' - ' JOIN object_state USING(zoid, tid)\n' + ' JOIN object_state USING(zoid, tid)\n' ' WHERE zoid = $1', 'PREPARE load_current (BIGINT) AS SELECT state, tid\n' ' FROM object_state\n' diff --git a/src/relstorage/adapters/tests/test_batch.py b/src/relstorage/adapters/tests/test_batch.py index 5548e8fe..813f2905 100644 --- a/src/relstorage/adapters/tests/test_batch.py +++ b/src/relstorage/adapters/tests/test_batch.py @@ -247,7 +247,7 @@ def test_flush(self): def test_select_one(self): cursor = MockCursor() batcher = self.getClass()(cursor) - batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,)) + list(batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,))) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids IN (%s)', (1,)) @@ -256,8 +256,8 @@ def test_select_one(self): def test_select_multiple_one_batch(self): cursor = MockCursor() batcher = self.getClass()(cursor) - batcher.select_from(('zoid', 'tid'), 'object_state', - oids=(1, 2, 3, 4)) + list(batcher.select_from(('zoid', 'tid'), 'object_state', + oids=(1, 2, 3, 4))) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids IN (%s,%s,%s,%s)', (1, 2, 3, 4)) @@ -274,6 +274,7 @@ def test_select_multiple_many_batch(self): batcher.row_limit = 2 rows = batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1, 2, 3, 4, 5)) + rows = list(rows) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids IN (%s,%s)', (1, 2,)), @@ -402,7 +403,7 @@ def getClass(self): def test_select_one(self): cursor = MockCursor() batcher = self.getClass()(cursor) - batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,)) + list(batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,))) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)', ([1,],)) @@ -411,8 +412,8 @@ def test_select_one(self): def test_select_multiple_one_batch(self): cursor = MockCursor() batcher = self.getClass()(cursor) - batcher.select_from(('zoid', 'tid'), 'object_state', - oids=(1, 2, 3, 4)) + list(batcher.select_from(('zoid', 'tid'), 'object_state', + oids=(1, 2, 3, 4))) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)', ([1, 2, 3, 4],)) @@ -429,6 +430,7 @@ def test_select_multiple_many_batch(self): batcher.row_limit = 2 rows = batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1, 2, 3, 4, 5)) + rows = list(rows) self.assertEqual(cursor.executed, [ ('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)', ([1, 2,],)), diff --git a/src/relstorage/cache/storage_cache.py b/src/relstorage/cache/storage_cache.py index ee829a43..3f6aaa6e 100644 --- a/src/relstorage/cache/storage_cache.py +++ b/src/relstorage/cache/storage_cache.py @@ -15,6 +15,8 @@ from __future__ import division from __future__ import print_function +# pylint:disable=too-many-lines + import logging import os import threading @@ -466,7 +468,6 @@ def load(self, cursor, oid_int): # Make a list of cache keys to query. The list will have either # 1 or 2 keys. cp0, cp1 = self.checkpoints - tid1 = cp0 tid2 = None tid_int = self.delta_after1.get(oid_int) if tid_int: @@ -474,12 +475,12 @@ def load(self, cursor, oid_int): elif cp1 != cp0: tid2 = cp1 - preferred_key = (oid_int, tid1) + preferred_key = (oid_int, cp0) # Query the cache. Query multiple keys simultaneously to # minimize latency. The client is responsible for moving # the data to the preferred key if it wasn't found there. - response = cache(oid_int, tid1, tid2) + response = cache(oid_int, cp0, tid2) if response: # We have a hit! state, actual_tid = response return state, actual_tid @@ -491,6 +492,51 @@ def load(self, cursor, oid_int): cache[preferred_key] = (state, tid_int) return state, tid_int + def prefetch(self, cursor, oid_ints): + # Just like load(), but we only fetch the OIDs + # we can't find in the cache. + if not self.checkpoints: + # No point even trying, we would just throw the results away + return + + to_fetch = OID_OBJECT_MAP_TYPE() # {oid: cache key} + cache = self.cache + cp0, cp1 = self.checkpoints + delta_after0 = self.delta_after0.get + delta_after1 = self.delta_after1.get + for oid_int in oid_ints: + tid_int = delta_after0(oid_int) + if tid_int: + key = (oid_int, tid_int) + cache_data = cache[key] + if not cache_data: + # That was our one place, so we must fetch + to_fetch[oid_int] = key + continue + + tid2 = None + tid_int = delta_after1(oid_int) + if tid_int: + tid2 = tid_int + elif cp1 != cp0: + tid2 = cp1 + + cache_data = cache(oid_int, cp0, tid2) + if not cache_data: + preferred_key = (oid_int, cp0) + to_fetch[oid_int] = preferred_key + + if not to_fetch: + return + + for oid, state, tid_int in self.adapter.mover.load_currents(cursor, to_fetch): + key = to_fetch[oid] + # Note that we're losing the knowledge of whether the TID + # in the key came from delta_after0 or not, so we're not + # validating that part. + self._check_tid_after_load(oid, tid_int) + cache[key] = (state, tid_int) + def tpc_begin(self): """Prepare temp space for objects to cache.""" q = self.temp_objects = _TemporaryStorage() diff --git a/src/relstorage/storage.py b/src/relstorage/storage.py index fa229a60..cb071164 100644 --- a/src/relstorage/storage.py +++ b/src/relstorage/storage.py @@ -555,25 +555,28 @@ def _before_load(self): self._restart_load_and_poll() assert self._load_transaction_open == 'active' - @Metric(method=True, rate=0.1) - def load(self, oid, version=''): - # pylint:disable=unused-argument + def __load_using_method(self, meth, argument): if self._stale_error is not None: raise self._stale_error - oid_int = bytes8_to_int64(oid) - cache = self._cache - with self._lock: self._before_load() cursor = self._load_cursor try: - state, tid_int = cache.load(cursor, oid_int) + return meth(cursor, argument) except CacheConsistencyError: log.exception("Cache consistency error; restarting load") self._drop_load_connection() raise + @Metric(method=True, rate=0.1) + def load(self, oid, version=''): + # pylint:disable=unused-argument + + oid_int = bytes8_to_int64(oid) + + state, tid_int = self.__load_using_method(self._cache.load, oid_int) + if tid_int is None: self._log_keyerror(oid_int, "no tid found") raise POSKeyError(oid) @@ -585,11 +588,20 @@ def load(self, oid, version=''): raise POSKeyError(oid) return state, int64_to_8bytes(tid_int) - def getTid(self, oid): - if self._stale_error is not None: - raise self._stale_error + def prefetch(self, oids): + prefetch = self._cache.prefetch + oid_ints = [bytes8_to_int64(oid) for oid in oids] + try: + self.__load_using_method(prefetch, oid_ints) + except Exception: # pylint:disable=broad-except + # This could raise self._stale_error, or + # CacheConsistencyError. Both of those mean that regular loads + # may fail too, but we don't know what our transaction state is + # at this time, so we don't want to raise it to the caller. + log.exception("Failed to prefetch") - _state, serial = self.load(oid, '') + def getTid(self, oid): + _state, serial = self.load(oid) return serial def loadEx(self, oid, version=''): @@ -633,7 +645,7 @@ def loadBefore(self, oid, tid): else: self._before_load() cursor = self._load_cursor - if not self._adapter.mover.exists(cursor, bytes8_to_int64(oid)): + if not self._adapter.mover.exists(cursor, oid_int): raise POSKeyError(oid) state, start_tid = self._adapter.mover.load_before( diff --git a/src/relstorage/tests/reltestbase.py b/src/relstorage/tests/reltestbase.py index eb6fa0b9..b11166ea 100644 --- a/src/relstorage/tests/reltestbase.py +++ b/src/relstorage/tests/reltestbase.py @@ -29,6 +29,7 @@ from zc.zlibstorage import ZlibStorage import ZODB.tests.util +from ZODB.utils import z64 from ZODB.DB import DB from ZODB.FileStorage import FileStorage from ZODB.POSException import ReadConflictError @@ -922,6 +923,26 @@ def checkGeventSwitchesOnOpen(self): with assert_switches(): self.open() + def checkPrefetch(self): + db = DB(self._storage) + conn = db.open() + + mapping = conn.root()['key'] = PersistentMapping() + + transaction.commit() + + self.assertEqual(3, len(self._storage._cache)) + self._storage._cache.clear() + self.assertEmpty(self._storage._cache) + + conn.prefetch(z64, mapping) + + self.assertEqual(2, len(self._storage._cache)) + + # second time is a no-op + conn.prefetch(z64, mapping) + self.assertEqual(2, len(self._storage._cache)) + class AbstractRSZodbConvertTests(StorageCreatingMixin, FSZODBConvertTests,