Skip to content

Commit

Permalink
Merge e71b01d into 86ddaf4
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jun 26, 2019
2 parents 86ddaf4 + e71b01d commit 9922df6
Show file tree
Hide file tree
Showing 13 changed files with 724 additions and 656 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -36,6 +36,10 @@
limit was reached which could result in blobs appearing to be
spuriously empty. This was only observed on macOS. See :issue:`219`.

- Fix a bug computing the cache delta maps when restoring from
persistent cache that could cause data from a single transaction to
be stale, leading to spurious conflicts.

3.0a2 (2019-06-19)
==================

Expand Down
10 changes: 8 additions & 2 deletions src/relstorage/_compat.py
Expand Up @@ -46,8 +46,14 @@ def list_values(d):
iterkeys = dict.iterkeys
itervalues = dict.itervalues

OID_TID_MAP_TYPE = BTrees.family64.II.BTree if not PYPY else dict
OID_OBJECT_MAP_TYPE = BTrees.family64.IO.BTree if not PYPY else dict
if not PYPY:
OID_TID_MAP_TYPE = BTrees.family64.II.BTree
OID_OBJECT_MAP_TYPE = BTrees.family64.IO.BTree
OID_SET_TYPE = BTrees.family64.II.TreeSet
else:
OID_TID_MAP_TYPE = dict
OID_OBJECT_MAP_TYPE = dict
OID_SET_TYPE = set

def iteroiditems(d):
# Could be either a BTree, which always has 'iteritems',
Expand Down
20 changes: 19 additions & 1 deletion src/relstorage/cache/local_client.py
Expand Up @@ -162,6 +162,24 @@ def restore(self, row_filter=None):
with closing(conn):
self.read_from_sqlite(conn, row_filter)

@_log_timed
def remove_invalid_persistent_oids(self, bad_oids):
"""
Remove data from the persistent cache for the given oids.
"""
options = self.options
if not options.cache_local_dir:
return

count_removed = 0
conn = '(no oids to remove)'
if bad_oids:
conn = sqlite_connect(options, self.prefix, close_async=False)
with closing(conn):
db = Database.from_connection(conn)
count_removed = db.remove_invalid_persistent_oids(bad_oids)
logger.debug("Removed %d invalid OIDs from %s", count_removed, conn)

def zap_all(self):
_, destroy = sqlite_files(self.options, self.prefix)
destroy()
Expand Down Expand Up @@ -495,7 +513,7 @@ def write_to_sqlite(self, connection):
# saw a new TID for it, but we had nothing to replace it with.
min_allowed_writeback = OID_TID_MAP_TYPE()
for k, v in self._min_allowed_writeback.items():
if stored_oid_tid.get(k) < v:
if stored_oid_tid.get(k, MAX_TID) < v:
min_allowed_writeback[k] = v
db.trim_to_size(self.limit, min_allowed_writeback)
del min_allowed_writeback
Expand Down
12 changes: 12 additions & 0 deletions src/relstorage/cache/local_database.py
Expand Up @@ -146,6 +146,18 @@ def checkpoints(self):
self.cursor.execute("SELECT cp0, cp1 FROM checkpoints")
return self.cursor.fetchone()

def remove_invalid_persistent_oids(self, bad_oids):
cur = self.cursor
cur.execute("BEGIN")
batch = RowBatcher(cur,
row_limit=999 // 1,
delete_placeholder='?')
for oid in bad_oids:
batch.delete_from('object_state', zoid=oid)
batch.flush()
cur.execute("COMMIT")
return batch.total_rows_deleted

def fetch_rows_by_priority(self):
"""
The returned cursor will iterate ``(zoid, tid, state, tid)``
Expand Down
98 changes: 62 additions & 36 deletions src/relstorage/cache/storage_cache.py
Expand Up @@ -29,6 +29,7 @@
from relstorage.autotemp import AutoTemporaryFile
from relstorage._compat import OID_TID_MAP_TYPE
from relstorage._compat import OID_OBJECT_MAP_TYPE
from relstorage._compat import OID_SET_TYPE
from relstorage._compat import iteroiditems
from relstorage._util import log_timed

Expand Down Expand Up @@ -77,9 +78,11 @@ class StorageCache(object):

# current_tid contains the last polled transaction ID. Invariant:
# when self.checkpoints is not None, self.delta_after0 has info
# from all transactions in the range:
# from *all* transactions in the range:
#
# self.checkpoints[0] < tid <= self.current_tid
# (self.checkpoints[0], self.current_tid]
#
# (That is, `tid > self.checkpoints[0] and tid <= self.current_tid`)
#
# We assign to this *only* after executing a poll, or
# when reading data from the persistent cache (which happens at
Expand Down Expand Up @@ -222,6 +225,7 @@ def restore(self):
# comes back in the delta_map; that's ok.
row_filter = _PersistentRowFilter(self.adapter, self._delta_map_type)
self.local_client.restore(row_filter)
self.local_client.remove_invalid_persistent_oids(row_filter.polled_invalid_oids)

self.checkpoints = self.local_client.get_checkpoints()
if self.checkpoints:
Expand Down Expand Up @@ -775,6 +779,7 @@ def __init__(self, adapter, delta_type):
self.adapter = adapter
self.delta_after0 = delta_type()
self.delta_after1 = delta_type()
self.polled_invalid_oids = OID_SET_TYPE()

def __call__(self, checkpoints, row_iter):
if not checkpoints:
Expand All @@ -801,29 +806,30 @@ def __call__(self, checkpoints, row_iter):
value = row[2:]
oid = key[0]
actual_tid = value[1]

if actual_tid >= cp0:
# XXX: This is absolutely not right. We'll poll
# for changes *after* cp0 (because we set that as
# our current_tid/the storage's prev_polled_tid)
# and update self._delta_after0, but we won't poll
# for changes *after* cp1. self._delta_after1 is
# only ever populated when we shift checkpoints;
# we assume any changes that happen after that
# point we catch in an updated self._delta_after0.
# But because we're using >= here, instead of
# strictly >, things that actually changed in
# exactly cp0 we'll miss; they'll wind up as a
# trusted key in delta_after1, and that state may
# be outdated.
#
# Also, because we're combining data in the local database from
# multiple sources, it's *possible* that some old cache
# had checkpoints that are behind what we're working with now.
# So we can't actually trust anything that we would put in delta_after1
# without validating them.
# See __poll_replace_checkpoints() to see how we build
# the delta maps.
#
# We'll poll for changes *after* cp0
# (because we set that as our current_tid/the
# storage's prev_polled_tid) and update
# self._delta_after0, but we won't poll for changes
# *after* cp1. self._delta_after1 is only ever
# populated when we shift checkpoints; we assume any
# changes that happen after that point we catch in an
# updated self._delta_after0.
#
# Also, because we're combining data in the local
# database from multiple sources, it's *possible* that
# some old cache had checkpoints that are behind what
# we're working with now. So we can't actually trust
# anything that we would put in delta_after1 without
# validating them. We still return it, but we may take
# it out of delta_after0 if it turns out to be
# invalid.

if actual_tid > cp0:
delta_after0[oid] = actual_tid
elif actual_tid >= cp1:
elif actual_tid > cp1:
delta_after1[oid] = actual_tid
else:
# This is too old and outside our checkpoints for
Expand All @@ -834,7 +840,7 @@ def __call__(self, checkpoints, row_iter):
# and the storage won't poll this far back.
#
# The solution is to hold onto it and run a manual poll ourself;
# if it's still valid, good. If not, we really should try to
# if it's still valid, good. If not, someone should
# remove it from the database so we don't keep checking.
# We also should only do this poll if we have room in our cache
# still (that should rarely be an issue; our db write size
Expand All @@ -843,11 +849,16 @@ def __call__(self, checkpoints, row_iter):
needs_checked[oid] = value
continue
yield key, value

# Now validate things that need validated.

# TODO: Should this be a configurable option, like ZEO's
# 'drop-rather-invalidate'? So far I haven't seen signs that
# this will be particularly slow or burdensome.
self._poll_delta_after1()

if needs_checked:
# TODO: Should this be a configurable option, like ZEO's
# 'drop-rather-invalidate'? So far I haven't seen signs that
# this will be particularly slow or burdensome.
self._poll_old_oids(needs_checked)
self._poll_old_oids_and_remove(needs_checked)
for oid, value in iteroiditems(needs_checked):
# Anything left is guaranteed to still be at the tid we recorded
# for it (except in the event of a concurrent transaction that
Expand All @@ -856,7 +867,7 @@ def __call__(self, checkpoints, row_iter):
yield (oid, cp0), value

@log_timed
def _poll_old_oids(self, to_check):
def _poll_old_oids_and_remove(self, to_check):
oids = list(to_check)
# In local tests, this function executes against PostgreSQL 11 in .78s
# for 133,002 older OIDs; or, .35s for 57,002 OIDs against MySQL 5.7.
Expand All @@ -866,16 +877,31 @@ def callback(_conn, cursor):
current_tids_for_oids = self.adapter.connmanager.open_and_call(callback)

for oid in oids:
# TODO: Propagate these removals down to the database.
if oid not in current_tids_for_oids:
# Removed.
del to_check[oid]
elif to_check[oid][1] != current_tids_for_oids[oid]:
# Changed.
if (oid not in current_tids_for_oids
or to_check[oid][1] != current_tids_for_oids[oid]):
del to_check[oid]
self.polled_invalid_oids.add(oid)

logger.debug("Polled %d older oids stored in cache; %d survived",
len(oids), len(to_check))

@log_timed
def _poll_delta_after1(self):
orig_delta_after1 = self.delta_after1
oids = list(self.delta_after1)
logger.debug("Polling %d oids in delta_after1", len(oids))
def callback(_conn, cursor):
return self.adapter.mover.current_object_tids(cursor, oids)
current_tids_for_oids = self.adapter.connmanager.open_and_call(callback)
self.delta_after1 = type(self.delta_after1)(current_tids_for_oids)
invalid_oids = {
oid
for oid, tid in iteroiditems(orig_delta_after1)
if oid not in self.delta_after1 or self.delta_after1[oid] != tid
}
self.polled_invalid_oids.update(invalid_oids)
logger.debug("Polled %d oids in delta_after1; %d survived",
len(oids), len(oids) - len(invalid_oids))

class _TemporaryStorage(object):
def __init__(self):
Expand Down
2 changes: 2 additions & 0 deletions src/relstorage/cache/tests/test_local_client.py
Expand Up @@ -512,6 +512,8 @@ def get_cache_files():
cache_files = get_cache_files()
self.assertEqual(len_initial_cache_files, len(cache_files))

c3.remove_invalid_persistent_oids([0])

# At no point did we spawn extra threads
self.assertEqual(1, threading.active_count())

Expand Down
12 changes: 12 additions & 0 deletions src/relstorage/cache/tests/test_local_database.py
Expand Up @@ -99,6 +99,18 @@ def test_move_from_temp_mixed_updates(self):
self.assertEqual(rows_in_db[1], (1, 1, b'1', 1))
self.assertEqual(rows_in_db[2], (2, 2, b'2b', 2))

def test_remove_invalid_persistent_oids(self):
rows = [
(0, 1, b'0', 0),
(1, 1, b'0', 0),
]
self.db.store_temp(rows)
self.db.move_from_temp()

invalid_oids = range(1, 5000)
count = self.db.remove_invalid_persistent_oids(invalid_oids)
self.assertEqual(count, len(invalid_oids))
self.assertEqual(dict(self.db.oid_to_tid), {0: 1})

def test_trim_to_size_deletes_stale(self):
rows = [
Expand Down
58 changes: 51 additions & 7 deletions src/relstorage/cache/tests/test_storage_cache.py
Expand Up @@ -615,21 +615,65 @@ def test_deltas(self):
cp0 = 5000
cp1 = 4000

tid_after0 = 5000
tid_after1 = 4000
tid_after0 = 5001
tid_after1 = 4001
# The old_tid, outside the checkpoint range,
# will get dropped.
# will get completely dropped.
old_tid = 3999

rows = [
(1, tid_after0, b'1', tid_after0),
(0, tid_after0, b'0', tid_after0),
(1, cp0, b'1', cp0),
(2, tid_after1, b'2', tid_after1),
(3, old_tid, b'3', old_tid)
(3, cp1, b'3', cp1),
(4, old_tid, b'4', old_tid)
]

results = list(f((cp0, cp1), rows))

self.assertEqual(results, [
((1, tid_after0), (b'1', tid_after0)),
((2, tid_after1), (b'2', tid_after1)),
(rows[0][:2], rows[0][2:]),
(rows[1][:2], rows[1][2:]),
(rows[2][:2], rows[2][2:]),
])

self.assertEqual(dict(f.delta_after0), {0: 5001})
# We attempted validation on this, and we found nothing,
# so we can't claim knowledge.
self.assertEqual(dict(f.delta_after1), {})
# 1 and 2 were polled because they would go in delta_after_1,
# 3 and 4 were polled because they fall outside the checkpoint ranges
self.assertEqual(set(f.polled_invalid_oids), {1, 2, 3, 4})

# Let's verify we can find things we poll for.
f = self._makeOne()
f.adapter.mover.data[2] = (b'', tid_after1)
f.adapter.mover.data[4] = (b'', old_tid)
results = list(f((cp0, cp1), rows))

self.assertEqual(results, [
(rows[0][:2], rows[0][2:]),
(rows[1][:2], rows[1][2:]),
(rows[2][:2], rows[2][2:]),
((4, 5000), rows[4][2:]),
])

self.assertEqual(dict(f.delta_after0), {0: tid_after0})
self.assertEqual(dict(f.delta_after1), {2: tid_after1})
self.assertEqual(set(f.polled_invalid_oids), {1, 3})

# Test when the tid doesn't match
f = self._makeOne()
f.adapter.mover.data[2] = (b'', tid_after1 + 2)
f.adapter.mover.data[4] = (b'', old_tid + 1)
results = list(f((cp0, cp1), rows))

self.assertEqual(results, [
(rows[0][:2], rows[0][2:]),
(rows[1][:2], rows[1][2:]),
(rows[2][:2], rows[2][2:]),
])

self.assertEqual(dict(f.delta_after0), {0: tid_after0})
self.assertEqual(dict(f.delta_after1), {2: tid_after1 + 2})
self.assertEqual(set(f.polled_invalid_oids), {1, 2, 3, 4})

0 comments on commit 9922df6

Please sign in to comment.