Skip to content

Commit

Permalink
Merge pull request #259 from zodb/issue239
Browse files Browse the repository at this point in the history
Add support for prefetch. Fixes #239.
  • Loading branch information
jamadden committed Jun 26, 2019
2 parents 5d9a9d9 + 91a457d commit 41cf87e
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 32 deletions.
7 changes: 6 additions & 1 deletion CHANGES.rst
Expand Up @@ -5,7 +5,12 @@
3.0a4 (unreleased)
==================

- Nothing changed yet.
- Add support for the ZODB 5 ``connection.prefetch(*args)`` API. This
takes either OIDs (``obj._p_oid``) or persistent ghost objects, or
an iterator of those things, and asks the storage to load them into
its cache for use in the future. In RelStorage, this uses the shared
cache and so may be useful for more than one thread. This can be
3x or more faster than loading objects on-demand. See :issue:`239`.


3.0a3 (2019-06-26)
Expand Down
8 changes: 3 additions & 5 deletions src/relstorage/adapters/batch.py
Expand Up @@ -86,21 +86,19 @@ def select_from(self, columns, table, **kw):
The keyword arguments should be of length 1, containing
an iterable of the values to check.
Returns a list of matching rows.
Returns a iterator of matching rows.
"""
assert len(kw) == 1
filter_column, filter_values = kw.popitem()
filter_values = list(filter_values)
results = []
command = 'SELECT %s' % (','.join(columns),)
while filter_values:
filter_subset = filter_values[:self.row_limit]
del filter_values[:self.row_limit]
descriptor = [[(table, (filter_column,)), filter_subset]]
self._do_batch(command, descriptor, rows_need_flattened=False)
results.extend(self.cursor.fetchall())
return results

for row in self.cursor.fetchall():
yield row

def flush(self):
if self.deletes:
Expand Down
15 changes: 13 additions & 2 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -353,9 +353,20 @@ class IObjectMover(Interface):
"""Move object states to/from the database and within the database."""

def load_current(cursor, oid):
"""Returns the current state and integer tid for an object.
"""
Returns the current state and integer tid for an object.
*oid* is an integer. Returns (None, None) if object does not
exist.
"""

def load_currents(cursor, oids):
"""
Returns the oid integer, state, and integer tid for all the specified
objects.
oid is an integer. Returns (None, None) if object does not exist.
*oids* is an iterable of integers. If any objects do no exist,
they are ignored.
"""

def load_revision(cursor, oid, tid):
Expand Down
22 changes: 20 additions & 2 deletions src/relstorage/adapters/mover.py
Expand Up @@ -59,7 +59,7 @@ def _compute_md5sum(self, data):
"""
SELECT state, tid
FROM current_object
JOIN object_state USING(zoid, tid)
JOIN object_state USING(zoid, tid)
WHERE zoid = %s
""",
"""
Expand Down Expand Up @@ -93,6 +93,24 @@ def load_current(self, cursor, oid):

return None, None

_load_currents_queries = (
(('zoid', 'state', 'tid'), 'current_object JOIN object_state USING(zoid, tid)', 'zoid'),
(('zoid', 'state', 'tid'), 'object_state', 'zoid'),
)

_load_currents_query = _query_property('_load_currents')

@metricmethod_sampled
def load_currents(self, cursor, oids):
"""Returns the current {oid: tid} for specified object ids."""
columns, table, filter_column = self._load_currents_query
binary_column_as_state_type = self.driver.binary_column_as_state_type
batcher = self.make_batcher(cursor, row_limit=1000)
rows = batcher.select_from(columns, table, **{filter_column: oids})
for row in rows:
oid, state, tid = row
yield oid, binary_column_as_state_type(state), tid

_load_revision_query = """
SELECT state
FROM object_state
Expand Down Expand Up @@ -189,7 +207,7 @@ def current_object_tids(self, cursor, oids):
columns, table, filter_column = self._current_object_tids_query
batcher = self.make_batcher(cursor, row_limit=1000)
rows = batcher.select_from(columns, table, **{filter_column: oids})
res = self._current_object_tids_map_type(rows)
res = self._current_object_tids_map_type(list(rows))
return res

#: A sequence of *names* of attributes on this object that are statements to be
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/postgresql/tests/test_mover.py
Expand Up @@ -72,7 +72,7 @@ def test_prepare_load_current(self):
[
'PREPARE load_current (BIGINT) AS SELECT state, tid\n'
' FROM current_object\n'
' JOIN object_state USING(zoid, tid)\n'
' JOIN object_state USING(zoid, tid)\n'
' WHERE zoid = $1',
'PREPARE load_current (BIGINT) AS SELECT state, tid\n'
' FROM object_state\n'
Expand Down
14 changes: 8 additions & 6 deletions src/relstorage/adapters/tests/test_batch.py
Expand Up @@ -247,7 +247,7 @@ def test_flush(self):
def test_select_one(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,))
list(batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,)))
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids IN (%s)',
(1,))
Expand All @@ -256,8 +256,8 @@ def test_select_one(self):
def test_select_multiple_one_batch(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4))
list(batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4)))
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids IN (%s,%s,%s,%s)',
(1, 2, 3, 4))
Expand All @@ -274,6 +274,7 @@ def test_select_multiple_many_batch(self):
batcher.row_limit = 2
rows = batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4, 5))
rows = list(rows)
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids IN (%s,%s)',
(1, 2,)),
Expand Down Expand Up @@ -402,7 +403,7 @@ def getClass(self):
def test_select_one(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,))
list(batcher.select_from(('zoid', 'tid'), 'object_state', oids=(1,)))
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)',
([1,],))
Expand All @@ -411,8 +412,8 @@ def test_select_one(self):
def test_select_multiple_one_batch(self):
cursor = MockCursor()
batcher = self.getClass()(cursor)
batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4))
list(batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4)))
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)',
([1, 2, 3, 4],))
Expand All @@ -429,6 +430,7 @@ def test_select_multiple_many_batch(self):
batcher.row_limit = 2
rows = batcher.select_from(('zoid', 'tid'), 'object_state',
oids=(1, 2, 3, 4, 5))
rows = list(rows)
self.assertEqual(cursor.executed, [
('SELECT zoid,tid FROM object_state WHERE oids = ANY (%s)',
([1, 2,],)),
Expand Down
52 changes: 49 additions & 3 deletions src/relstorage/cache/storage_cache.py
Expand Up @@ -15,6 +15,8 @@
from __future__ import division
from __future__ import print_function

# pylint:disable=too-many-lines

import logging
import os
import threading
Expand Down Expand Up @@ -466,20 +468,19 @@ def load(self, cursor, oid_int):
# Make a list of cache keys to query. The list will have either
# 1 or 2 keys.
cp0, cp1 = self.checkpoints
tid1 = cp0
tid2 = None
tid_int = self.delta_after1.get(oid_int)
if tid_int:
tid2 = tid_int
elif cp1 != cp0:
tid2 = cp1

preferred_key = (oid_int, tid1)
preferred_key = (oid_int, cp0)

# Query the cache. Query multiple keys simultaneously to
# minimize latency. The client is responsible for moving
# the data to the preferred key if it wasn't found there.
response = cache(oid_int, tid1, tid2)
response = cache(oid_int, cp0, tid2)
if response: # We have a hit!
state, actual_tid = response
return state, actual_tid
Expand All @@ -491,6 +492,51 @@ def load(self, cursor, oid_int):
cache[preferred_key] = (state, tid_int)
return state, tid_int

def prefetch(self, cursor, oid_ints):
# Just like load(), but we only fetch the OIDs
# we can't find in the cache.
if not self.checkpoints:
# No point even trying, we would just throw the results away
return

to_fetch = OID_OBJECT_MAP_TYPE() # {oid: cache key}
cache = self.cache
cp0, cp1 = self.checkpoints
delta_after0 = self.delta_after0.get
delta_after1 = self.delta_after1.get
for oid_int in oid_ints:
tid_int = delta_after0(oid_int)
if tid_int:
key = (oid_int, tid_int)
cache_data = cache[key]
if not cache_data:
# That was our one place, so we must fetch
to_fetch[oid_int] = key
continue

tid2 = None
tid_int = delta_after1(oid_int)
if tid_int:
tid2 = tid_int
elif cp1 != cp0:
tid2 = cp1

cache_data = cache(oid_int, cp0, tid2)
if not cache_data:
preferred_key = (oid_int, cp0)
to_fetch[oid_int] = preferred_key

if not to_fetch:
return

for oid, state, tid_int in self.adapter.mover.load_currents(cursor, to_fetch):
key = to_fetch[oid]
# Note that we're losing the knowledge of whether the TID
# in the key came from delta_after0 or not, so we're not
# validating that part.
self._check_tid_after_load(oid, tid_int)
cache[key] = (state, tid_int)

def tpc_begin(self):
"""Prepare temp space for objects to cache."""
q = self.temp_objects = _TemporaryStorage()
Expand Down
36 changes: 24 additions & 12 deletions src/relstorage/storage.py
Expand Up @@ -555,25 +555,28 @@ def _before_load(self):
self._restart_load_and_poll()
assert self._load_transaction_open == 'active'

@Metric(method=True, rate=0.1)
def load(self, oid, version=''):
# pylint:disable=unused-argument
def __load_using_method(self, meth, argument):
if self._stale_error is not None:
raise self._stale_error

oid_int = bytes8_to_int64(oid)
cache = self._cache

with self._lock:
self._before_load()
cursor = self._load_cursor
try:
state, tid_int = cache.load(cursor, oid_int)
return meth(cursor, argument)
except CacheConsistencyError:
log.exception("Cache consistency error; restarting load")
self._drop_load_connection()
raise

@Metric(method=True, rate=0.1)
def load(self, oid, version=''):
# pylint:disable=unused-argument

oid_int = bytes8_to_int64(oid)

state, tid_int = self.__load_using_method(self._cache.load, oid_int)

if tid_int is None:
self._log_keyerror(oid_int, "no tid found")
raise POSKeyError(oid)
Expand All @@ -585,11 +588,20 @@ def load(self, oid, version=''):
raise POSKeyError(oid)
return state, int64_to_8bytes(tid_int)

def getTid(self, oid):
if self._stale_error is not None:
raise self._stale_error
def prefetch(self, oids):
prefetch = self._cache.prefetch
oid_ints = [bytes8_to_int64(oid) for oid in oids]
try:
self.__load_using_method(prefetch, oid_ints)
except Exception: # pylint:disable=broad-except
# This could raise self._stale_error, or
# CacheConsistencyError. Both of those mean that regular loads
# may fail too, but we don't know what our transaction state is
# at this time, so we don't want to raise it to the caller.
log.exception("Failed to prefetch")

_state, serial = self.load(oid, '')
def getTid(self, oid):
_state, serial = self.load(oid)
return serial

def loadEx(self, oid, version=''):
Expand Down Expand Up @@ -633,7 +645,7 @@ def loadBefore(self, oid, tid):
else:
self._before_load()
cursor = self._load_cursor
if not self._adapter.mover.exists(cursor, bytes8_to_int64(oid)):
if not self._adapter.mover.exists(cursor, oid_int):
raise POSKeyError(oid)

state, start_tid = self._adapter.mover.load_before(
Expand Down
21 changes: 21 additions & 0 deletions src/relstorage/tests/reltestbase.py
Expand Up @@ -29,6 +29,7 @@
from zc.zlibstorage import ZlibStorage

import ZODB.tests.util
from ZODB.utils import z64
from ZODB.DB import DB
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ReadConflictError
Expand Down Expand Up @@ -922,6 +923,26 @@ def checkGeventSwitchesOnOpen(self):
with assert_switches():
self.open()

def checkPrefetch(self):
db = DB(self._storage)
conn = db.open()

mapping = conn.root()['key'] = PersistentMapping()

transaction.commit()

self.assertEqual(3, len(self._storage._cache))
self._storage._cache.clear()
self.assertEmpty(self._storage._cache)

conn.prefetch(z64, mapping)

self.assertEqual(2, len(self._storage._cache))

# second time is a no-op
conn.prefetch(z64, mapping)
self.assertEqual(2, len(self._storage._cache))


class AbstractRSZodbConvertTests(StorageCreatingMixin,
FSZODBConvertTests,
Expand Down

0 comments on commit 41cf87e

Please sign in to comment.