diff --git a/CHANGES.rst b/CHANGES.rst index 1cf38d58..44238658 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -36,6 +36,10 @@ limit was reached which could result in blobs appearing to be spuriously empty. This was only observed on macOS. See :issue:`219`. +- Fix a bug computing the cache delta maps when restoring from + persistent cache that could cause data from a single transaction to + be stale, leading to spurious conflicts. + 3.0a2 (2019-06-19) ================== diff --git a/src/relstorage/_compat.py b/src/relstorage/_compat.py index 5c376f73..349b7514 100644 --- a/src/relstorage/_compat.py +++ b/src/relstorage/_compat.py @@ -46,8 +46,14 @@ def list_values(d): iterkeys = dict.iterkeys itervalues = dict.itervalues -OID_TID_MAP_TYPE = BTrees.family64.II.BTree if not PYPY else dict -OID_OBJECT_MAP_TYPE = BTrees.family64.IO.BTree if not PYPY else dict +if not PYPY: + OID_TID_MAP_TYPE = BTrees.family64.II.BTree + OID_OBJECT_MAP_TYPE = BTrees.family64.IO.BTree + OID_SET_TYPE = BTrees.family64.II.TreeSet +else: + OID_TID_MAP_TYPE = dict + OID_OBJECT_MAP_TYPE = dict + OID_SET_TYPE = set def iteroiditems(d): # Could be either a BTree, which always has 'iteritems', diff --git a/src/relstorage/cache/local_client.py b/src/relstorage/cache/local_client.py index 00511329..0b47ab75 100644 --- a/src/relstorage/cache/local_client.py +++ b/src/relstorage/cache/local_client.py @@ -162,6 +162,24 @@ def restore(self, row_filter=None): with closing(conn): self.read_from_sqlite(conn, row_filter) + @_log_timed + def remove_invalid_persistent_oids(self, bad_oids): + """ + Remove data from the persistent cache for the given oids. + """ + options = self.options + if not options.cache_local_dir: + return + + count_removed = 0 + conn = '(no oids to remove)' + if bad_oids: + conn = sqlite_connect(options, self.prefix, close_async=False) + with closing(conn): + db = Database.from_connection(conn) + count_removed = db.remove_invalid_persistent_oids(bad_oids) + logger.debug("Removed %d invalid OIDs from %s", count_removed, conn) + def zap_all(self): _, destroy = sqlite_files(self.options, self.prefix) destroy() @@ -495,7 +513,7 @@ def write_to_sqlite(self, connection): # saw a new TID for it, but we had nothing to replace it with. min_allowed_writeback = OID_TID_MAP_TYPE() for k, v in self._min_allowed_writeback.items(): - if stored_oid_tid.get(k) < v: + if stored_oid_tid.get(k, MAX_TID) < v: min_allowed_writeback[k] = v db.trim_to_size(self.limit, min_allowed_writeback) del min_allowed_writeback diff --git a/src/relstorage/cache/local_database.py b/src/relstorage/cache/local_database.py index 848ff178..1d578562 100644 --- a/src/relstorage/cache/local_database.py +++ b/src/relstorage/cache/local_database.py @@ -146,6 +146,18 @@ def checkpoints(self): self.cursor.execute("SELECT cp0, cp1 FROM checkpoints") return self.cursor.fetchone() + def remove_invalid_persistent_oids(self, bad_oids): + cur = self.cursor + cur.execute("BEGIN") + batch = RowBatcher(cur, + row_limit=999 // 1, + delete_placeholder='?') + for oid in bad_oids: + batch.delete_from('object_state', zoid=oid) + batch.flush() + cur.execute("COMMIT") + return batch.total_rows_deleted + def fetch_rows_by_priority(self): """ The returned cursor will iterate ``(zoid, tid, state, tid)`` diff --git a/src/relstorage/cache/storage_cache.py b/src/relstorage/cache/storage_cache.py index f616e1c0..796b9050 100644 --- a/src/relstorage/cache/storage_cache.py +++ b/src/relstorage/cache/storage_cache.py @@ -29,6 +29,7 @@ from relstorage.autotemp import AutoTemporaryFile from relstorage._compat import OID_TID_MAP_TYPE from relstorage._compat import OID_OBJECT_MAP_TYPE +from relstorage._compat import OID_SET_TYPE from relstorage._compat import iteroiditems from relstorage._util import log_timed @@ -77,9 +78,11 @@ class StorageCache(object): # current_tid contains the last polled transaction ID. Invariant: # when self.checkpoints is not None, self.delta_after0 has info - # from all transactions in the range: + # from *all* transactions in the range: # - # self.checkpoints[0] < tid <= self.current_tid + # (self.checkpoints[0], self.current_tid] + # + # (That is, `tid > self.checkpoints[0] and tid <= self.current_tid`) # # We assign to this *only* after executing a poll, or # when reading data from the persistent cache (which happens at @@ -222,6 +225,7 @@ def restore(self): # comes back in the delta_map; that's ok. row_filter = _PersistentRowFilter(self.adapter, self._delta_map_type) self.local_client.restore(row_filter) + self.local_client.remove_invalid_persistent_oids(row_filter.polled_invalid_oids) self.checkpoints = self.local_client.get_checkpoints() if self.checkpoints: @@ -775,6 +779,7 @@ def __init__(self, adapter, delta_type): self.adapter = adapter self.delta_after0 = delta_type() self.delta_after1 = delta_type() + self.polled_invalid_oids = OID_SET_TYPE() def __call__(self, checkpoints, row_iter): if not checkpoints: @@ -801,29 +806,30 @@ def __call__(self, checkpoints, row_iter): value = row[2:] oid = key[0] actual_tid = value[1] - - if actual_tid >= cp0: - # XXX: This is absolutely not right. We'll poll - # for changes *after* cp0 (because we set that as - # our current_tid/the storage's prev_polled_tid) - # and update self._delta_after0, but we won't poll - # for changes *after* cp1. self._delta_after1 is - # only ever populated when we shift checkpoints; - # we assume any changes that happen after that - # point we catch in an updated self._delta_after0. - # But because we're using >= here, instead of - # strictly >, things that actually changed in - # exactly cp0 we'll miss; they'll wind up as a - # trusted key in delta_after1, and that state may - # be outdated. - # - # Also, because we're combining data in the local database from - # multiple sources, it's *possible* that some old cache - # had checkpoints that are behind what we're working with now. - # So we can't actually trust anything that we would put in delta_after1 - # without validating them. + # See __poll_replace_checkpoints() to see how we build + # the delta maps. + # + # We'll poll for changes *after* cp0 + # (because we set that as our current_tid/the + # storage's prev_polled_tid) and update + # self._delta_after0, but we won't poll for changes + # *after* cp1. self._delta_after1 is only ever + # populated when we shift checkpoints; we assume any + # changes that happen after that point we catch in an + # updated self._delta_after0. + # + # Also, because we're combining data in the local + # database from multiple sources, it's *possible* that + # some old cache had checkpoints that are behind what + # we're working with now. So we can't actually trust + # anything that we would put in delta_after1 without + # validating them. We still return it, but we may take + # it out of delta_after0 if it turns out to be + # invalid. + + if actual_tid > cp0: delta_after0[oid] = actual_tid - elif actual_tid >= cp1: + elif actual_tid > cp1: delta_after1[oid] = actual_tid else: # This is too old and outside our checkpoints for @@ -834,7 +840,7 @@ def __call__(self, checkpoints, row_iter): # and the storage won't poll this far back. # # The solution is to hold onto it and run a manual poll ourself; - # if it's still valid, good. If not, we really should try to + # if it's still valid, good. If not, someone should # remove it from the database so we don't keep checking. # We also should only do this poll if we have room in our cache # still (that should rarely be an issue; our db write size @@ -843,11 +849,16 @@ def __call__(self, checkpoints, row_iter): needs_checked[oid] = value continue yield key, value + + # Now validate things that need validated. + + # TODO: Should this be a configurable option, like ZEO's + # 'drop-rather-invalidate'? So far I haven't seen signs that + # this will be particularly slow or burdensome. + self._poll_delta_after1() + if needs_checked: - # TODO: Should this be a configurable option, like ZEO's - # 'drop-rather-invalidate'? So far I haven't seen signs that - # this will be particularly slow or burdensome. - self._poll_old_oids(needs_checked) + self._poll_old_oids_and_remove(needs_checked) for oid, value in iteroiditems(needs_checked): # Anything left is guaranteed to still be at the tid we recorded # for it (except in the event of a concurrent transaction that @@ -856,7 +867,7 @@ def __call__(self, checkpoints, row_iter): yield (oid, cp0), value @log_timed - def _poll_old_oids(self, to_check): + def _poll_old_oids_and_remove(self, to_check): oids = list(to_check) # In local tests, this function executes against PostgreSQL 11 in .78s # for 133,002 older OIDs; or, .35s for 57,002 OIDs against MySQL 5.7. @@ -866,16 +877,31 @@ def callback(_conn, cursor): current_tids_for_oids = self.adapter.connmanager.open_and_call(callback) for oid in oids: - # TODO: Propagate these removals down to the database. - if oid not in current_tids_for_oids: - # Removed. - del to_check[oid] - elif to_check[oid][1] != current_tids_for_oids[oid]: - # Changed. + if (oid not in current_tids_for_oids + or to_check[oid][1] != current_tids_for_oids[oid]): del to_check[oid] + self.polled_invalid_oids.add(oid) + logger.debug("Polled %d older oids stored in cache; %d survived", len(oids), len(to_check)) + @log_timed + def _poll_delta_after1(self): + orig_delta_after1 = self.delta_after1 + oids = list(self.delta_after1) + logger.debug("Polling %d oids in delta_after1", len(oids)) + def callback(_conn, cursor): + return self.adapter.mover.current_object_tids(cursor, oids) + current_tids_for_oids = self.adapter.connmanager.open_and_call(callback) + self.delta_after1 = type(self.delta_after1)(current_tids_for_oids) + invalid_oids = { + oid + for oid, tid in iteroiditems(orig_delta_after1) + if oid not in self.delta_after1 or self.delta_after1[oid] != tid + } + self.polled_invalid_oids.update(invalid_oids) + logger.debug("Polled %d oids in delta_after1; %d survived", + len(oids), len(oids) - len(invalid_oids)) class _TemporaryStorage(object): def __init__(self): diff --git a/src/relstorage/cache/tests/test_local_client.py b/src/relstorage/cache/tests/test_local_client.py index faf4644e..ec48ba40 100644 --- a/src/relstorage/cache/tests/test_local_client.py +++ b/src/relstorage/cache/tests/test_local_client.py @@ -512,6 +512,8 @@ def get_cache_files(): cache_files = get_cache_files() self.assertEqual(len_initial_cache_files, len(cache_files)) + c3.remove_invalid_persistent_oids([0]) + # At no point did we spawn extra threads self.assertEqual(1, threading.active_count()) diff --git a/src/relstorage/cache/tests/test_local_database.py b/src/relstorage/cache/tests/test_local_database.py index 4435d0ba..9aea67ab 100644 --- a/src/relstorage/cache/tests/test_local_database.py +++ b/src/relstorage/cache/tests/test_local_database.py @@ -99,6 +99,18 @@ def test_move_from_temp_mixed_updates(self): self.assertEqual(rows_in_db[1], (1, 1, b'1', 1)) self.assertEqual(rows_in_db[2], (2, 2, b'2b', 2)) + def test_remove_invalid_persistent_oids(self): + rows = [ + (0, 1, b'0', 0), + (1, 1, b'0', 0), + ] + self.db.store_temp(rows) + self.db.move_from_temp() + + invalid_oids = range(1, 5000) + count = self.db.remove_invalid_persistent_oids(invalid_oids) + self.assertEqual(count, len(invalid_oids)) + self.assertEqual(dict(self.db.oid_to_tid), {0: 1}) def test_trim_to_size_deletes_stale(self): rows = [ diff --git a/src/relstorage/cache/tests/test_storage_cache.py b/src/relstorage/cache/tests/test_storage_cache.py index 64de0ab5..49813fc2 100644 --- a/src/relstorage/cache/tests/test_storage_cache.py +++ b/src/relstorage/cache/tests/test_storage_cache.py @@ -615,21 +615,65 @@ def test_deltas(self): cp0 = 5000 cp1 = 4000 - tid_after0 = 5000 - tid_after1 = 4000 + tid_after0 = 5001 + tid_after1 = 4001 # The old_tid, outside the checkpoint range, - # will get dropped. + # will get completely dropped. old_tid = 3999 rows = [ - (1, tid_after0, b'1', tid_after0), + (0, tid_after0, b'0', tid_after0), + (1, cp0, b'1', cp0), (2, tid_after1, b'2', tid_after1), - (3, old_tid, b'3', old_tid) + (3, cp1, b'3', cp1), + (4, old_tid, b'4', old_tid) ] results = list(f((cp0, cp1), rows)) self.assertEqual(results, [ - ((1, tid_after0), (b'1', tid_after0)), - ((2, tid_after1), (b'2', tid_after1)), + (rows[0][:2], rows[0][2:]), + (rows[1][:2], rows[1][2:]), + (rows[2][:2], rows[2][2:]), ]) + + self.assertEqual(dict(f.delta_after0), {0: 5001}) + # We attempted validation on this, and we found nothing, + # so we can't claim knowledge. + self.assertEqual(dict(f.delta_after1), {}) + # 1 and 2 were polled because they would go in delta_after_1, + # 3 and 4 were polled because they fall outside the checkpoint ranges + self.assertEqual(set(f.polled_invalid_oids), {1, 2, 3, 4}) + + # Let's verify we can find things we poll for. + f = self._makeOne() + f.adapter.mover.data[2] = (b'', tid_after1) + f.adapter.mover.data[4] = (b'', old_tid) + results = list(f((cp0, cp1), rows)) + + self.assertEqual(results, [ + (rows[0][:2], rows[0][2:]), + (rows[1][:2], rows[1][2:]), + (rows[2][:2], rows[2][2:]), + ((4, 5000), rows[4][2:]), + ]) + + self.assertEqual(dict(f.delta_after0), {0: tid_after0}) + self.assertEqual(dict(f.delta_after1), {2: tid_after1}) + self.assertEqual(set(f.polled_invalid_oids), {1, 3}) + + # Test when the tid doesn't match + f = self._makeOne() + f.adapter.mover.data[2] = (b'', tid_after1 + 2) + f.adapter.mover.data[4] = (b'', old_tid + 1) + results = list(f((cp0, cp1), rows)) + + self.assertEqual(results, [ + (rows[0][:2], rows[0][2:]), + (rows[1][:2], rows[1][2:]), + (rows[2][:2], rows[2][2:]), + ]) + + self.assertEqual(dict(f.delta_after0), {0: tid_after0}) + self.assertEqual(dict(f.delta_after1), {2: tid_after1 + 2}) + self.assertEqual(set(f.polled_invalid_oids), {1, 2, 3, 4}) diff --git a/src/relstorage/tests/README-15.txt b/src/relstorage/tests/README-15.txt deleted file mode 100644 index 17b324a1..00000000 --- a/src/relstorage/tests/README-15.txt +++ /dev/null @@ -1,63 +0,0 @@ - -Running Tests -============= - -To run tests of both RelStorage 1.4 and 1.5 on the same box, you need -a set of databases for each version, since they have different schemas. -Do this after following the instructions in README.txt: - - -PostgreSQL ----------- - -Execute the following using the ``psql`` command:: - - CREATE DATABASE relstorage15test OWNER relstoragetest; - CREATE DATABASE relstorage15test2 OWNER relstoragetest; - CREATE DATABASE relstorage15test_hf OWNER relstoragetest; - CREATE DATABASE relstorage15test2_hf OWNER relstoragetest; - -Also, add the following lines to the top of pg_hba.conf (if you put -them at the bottom, they may be overridden by other parameters):: - - local relstorage15test relstoragetest md5 - local relstorage15test2 relstoragetest md5 - local relstorage15test_hf relstoragetest md5 - local relstorage15test2_hf relstoragetest md5 - host relstorage15test relstoragetest 127.0.0.1/32 md5 - host relstorage15test_hf relstoragetest 127.0.0.1/32 md5 - - - -MySQL ------ - - CREATE DATABASE relstorage15test; - GRANT ALL ON relstorage15test.* TO 'relstoragetest'@'localhost'; - CREATE DATABASE relstorage15test2; - GRANT ALL ON relstorage15test2.* TO 'relstoragetest'@'localhost'; - CREATE DATABASE relstorage15test_hf; - GRANT ALL ON relstorage15test_hf.* TO 'relstoragetest'@'localhost'; - CREATE DATABASE relstorage15test2_hf; - GRANT ALL ON relstorage15test2_hf.* TO 'relstoragetest'@'localhost'; - FLUSH PRIVILEGES; - - -Oracle ------- - -Using ``sqlplus`` with ``SYS`` privileges, execute the -following:: - - CREATE USER relstorage15test IDENTIFIED BY relstoragetest; - GRANT CONNECT, RESOURCE, CREATE TABLE, CREATE SEQUENCE TO relstorage15test; - GRANT EXECUTE ON DBMS_LOCK TO relstorage15test; - CREATE USER relstoragetest2 IDENTIFIED BY relstorage15test; - GRANT CONNECT, RESOURCE, CREATE TABLE, CREATE SEQUENCE TO relstorage15test2; - GRANT EXECUTE ON DBMS_LOCK TO relstorage15test2; - CREATE USER relstoragetest_hf IDENTIFIED BY relstorage15test; - GRANT CONNECT, RESOURCE, CREATE TABLE, CREATE SEQUENCE TO relstorage15test_hf; - GRANT EXECUTE ON DBMS_LOCK TO relstorage15test_hf; - CREATE USER relstoragetest2_hf IDENTIFIED BY relstorage15test; - GRANT CONNECT, RESOURCE, CREATE TABLE, CREATE SEQUENCE TO relstorage15test2_hf; - GRANT EXECUTE ON DBMS_LOCK TO relstorage15test2_hf; diff --git a/src/relstorage/tests/README.txt b/src/relstorage/tests/README.txt deleted file mode 100644 index 8461916c..00000000 --- a/src/relstorage/tests/README.txt +++ /dev/null @@ -1 +0,0 @@ -See doc/developing.rst. diff --git a/src/relstorage/tests/__init__.py b/src/relstorage/tests/__init__.py index f3b16526..518bf908 100644 --- a/src/relstorage/tests/__init__.py +++ b/src/relstorage/tests/__init__.py @@ -1,7 +1,12 @@ """relstorage.tests package""" +import abc +import os import unittest +import transaction + +from relstorage._compat import ABC from relstorage.options import Options try: @@ -20,11 +25,111 @@ class TestCase(unittest.TestCase): None ) or getattr(unittest.TestCase, 'assertRaisesRegexp') + __to_close = () + + def setUp(self): + # This sets up a temporary directory for each test and + # changes to it. + super(TestCase, self).setUp() + self.__to_close = [] + + def _closing(self, o): + """ + Close the object before tearDown (opposite of addCleanup + so that exceptions will propagate). + + Returns the given object. + """ + self.__to_close.append(o) + return o + + def tearDown(self): + transaction.abort() + for x in reversed(self.__to_close): + x.close() + self.__to_close = () + super(TestCase, self).tearDown() + def assertIsEmpty(self, container): self.assertEqual(len(container), 0) assertEmpty = assertIsEmpty +class StorageCreatingMixin(ABC): + + keep_history = None # Override + driver_name = None # Override. + + @abc.abstractmethod + def make_adapter(self, options): + # abstract method + raise NotImplementedError + + @abc.abstractmethod + def get_adapter_class(self): + raise NotImplementedError + + @abc.abstractmethod + def get_adapter_zconfig(self): + """ + Return the part of the ZConfig string that makes the adapter. + + That is, return the , or section. + + Return text (unicode). + """ + raise NotImplementedError + + def get_adapter_zconfig_replica_conf(self): + return os.path.join(os.path.dirname(__file__), 'replicas.conf') + + @abc.abstractmethod + def verify_adapter_from_zconfig(self, adapter): + """ + Assert that the adapter configured from get_adapter_zconfig + is properly configured. + """ + raise NotImplementedError + + def _wrap_storage(self, storage): + return storage + + def make_storage(self, zap=True, **kw): + from . import util + from relstorage.storage import RelStorage + + if ('cache_servers' not in kw + and 'cache_module_name' not in kw + and kw.get('share_local_cache', True)): + if util.CACHE_SERVERS and util.CACHE_MODULE_NAME: + kw['cache_servers'] = util.CACHE_SERVERS + kw['cache_module_name'] = util.CACHE_MODULE_NAME + if 'cache_prefix' not in kw: + kw['cache_prefix'] = type(self).__name__ + self._testMethodName + if 'cache_local_dir' not in kw: + # Always use a persistent cache. This helps discover errors in + # the persistent cache. + # These tests run in a temporary directory that gets cleaned up, so the CWD is + # appropriate. + kw['cache_local_dir'] = '.' + + assert self.driver_name + options = Options(keep_history=self.keep_history, driver=self.driver_name, **kw) + adapter = self.make_adapter(options) + storage = RelStorage(adapter, options=options) + storage._batcher_row_limit = 1 + if zap: + # XXX: Some ZODB tests, possibly check4ExtStorageThread + # and check7StorageThreads don't close storages when done + # with them? This leads to connections remaining open with + # locks on PyPy, so on PostgreSQL we can't TRUNCATE tables + # and have to go the slow route. + # + # As of 2019-06-20 with PyPy 7.1.1, I'm no longer able to replicate + # a problem like that locally, so we go back to the fast way. + storage.zap_all() + return self._wrap_storage(storage) + class MockConnection(object): rolled_back = False closed = False diff --git a/src/relstorage/tests/persistentcache.py b/src/relstorage/tests/persistentcache.py new file mode 100644 index 00000000..611ad286 --- /dev/null +++ b/src/relstorage/tests/persistentcache.py @@ -0,0 +1,436 @@ +############################################################################## +# +# Copyright (c) 2019 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +""" +Test mixin for exercising persistent caching features. +""" +import time + + +import transaction + +from persistent.mapping import PersistentMapping +from ZODB.DB import DB +from ZODB.serialize import referencesf +#from ZODB.utils import p64 as int64_to_8bytes +from ZODB.utils import u64 as bytes8_to_int64 +from ZODB.utils import z64 + +from . import TestCase + + +ROOT_OID = 0 +ROOT_KEY = 'myobj' + +def find_cache(obj): + # Pass a connection, a storage, or the cache. + storage = getattr(obj, '_storage', obj) + cache = getattr(storage, '_cache', storage) + return cache + +class PersistentCacheStorageTests(TestCase): + # pylint:disable=abstract-method + + _storage = None + + def make_storage(self, *args, **kwargs): + raise NotImplementedError + + def __make_storage_pcache( + self, + expected_checkpoints=None, + expected_root_tid=None, + ): + """ + Make a storage that reads a persistent cache that + already exists. + """ + storage = self.make_storage(zap=False) + cache = storage._cache + # It did read checkpoints and TID + self.assertIsNotNone(cache.checkpoints) + if expected_checkpoints: + self.assertEqual(cache.checkpoints, expected_checkpoints) + + if expected_root_tid is not None: + self.assert_oid_known(ROOT_OID, storage) + self.assertEqual(cache.delta_after0[0], expected_root_tid) + else: + self.assert_oid_not_known(ROOT_OID, storage) + self.assertIsNotNone(cache.current_tid) + return storage + + def __make_storage_no_pcache(self): + storage = self._closing( + self.make_storage(cache_local_dir=None, zap=False) + ) + # It didn't read checkpoints or TID + cache = find_cache(storage) + self.assertIsNone(cache.checkpoints) + self.assertIsNone(cache.current_tid) + return storage + + def assert_oid_not_known(self, oid, storage): + cache = find_cache(storage) + self.assertNotIn(oid, cache.delta_after0) + self.assertNotIn(oid, cache.delta_after1) + + def assert_oid_known(self, oid, storage): + cache = find_cache(storage) + self.assertIn(oid, cache.delta_after0) + return cache.delta_after0[oid] + + def assert_tid_after(self, oid, tid, storage): + cache = find_cache(storage) + new_tid = self.assert_oid_known(oid, cache) + self.assertGreater(new_tid, tid) + return new_tid + + def assert_oid_current(self, oid, storage, expected_tid=None): + cache = find_cache(storage) + + tid = self.assert_oid_known(oid, cache) + self.assertEqual(cache.current_tid, tid) + if expected_tid is not None: + self.assertEqual(tid, expected_tid) + return tid + + def assert_checkpoints(self, storage, expected_checkpoints=None): + cache = find_cache(storage) + self.assertIsNotNone(cache.checkpoints) + if expected_checkpoints: + self.assertEqual(cache.checkpoints, expected_checkpoints) + return cache.checkpoints + + def assert_cached(self, oid, tid, storage): + cache = find_cache(storage) + cache_data = cache.local_client[(oid, tid)] + __traceback_info__ = (oid, tid), [k for k in cache.local_client if k[0] == oid] + self.assertIsNotNone(cache_data) + return cache_data + + def assert_cached_exact(self, oid, tid, storage): + data = self.assert_cached(oid, tid, storage) + self.assertEqual(data[1], tid) + return data + + def assert_oid_not_cached(self, oid, storage): + cache = find_cache(storage) + keys = [k for k in cache.local_client if k[0] == oid] + __traceback_info__ = (oid, keys) + self.assertEmpty(keys) + + + def __do_sets(self, root, new_data, old_tids, old_data): + for key, value in new_data.items(): + old_tid = old_tids.get(key) + old_value = old_data.get(key) + + key_path = key.split('.') + attr_name = key_path[-1] + __traceback_info__ = key_path + base = root + name = None + for name in key_path[:-1]: + base = getattr(base, name) + oid = bytes8_to_int64(base._p_oid) + __traceback_info__ = key, oid, old_tid + if old_tid is not None: + # Opening the database loaded the root object, so it's + # now cached with the expected key; it may not actually + # be at that exact TID, though. + self.assert_cached(oid, old_tid, root._p_jar) + + if old_value is not None: + val = getattr(base, attr_name) + self.assertEqual(val, old_value) + + setattr(base, attr_name, value) + # Make sure we have something + old_tids[key] = old_tid + + def __do_check_tids(self, root, old_tids): + for key, old_tid in old_tids.items(): + key_path = key.split('.') + base = root + for name in key_path[:-1]: + base = getattr(base, name) + oid = bytes8_to_int64(base._p_oid) + + # We have a saved TID for the root object. If we had an old one, + # it's now bigger. + if old_tid is not None: + self.assert_tid_after(oid, old_tid, root._p_jar) + else: + self.assert_oid_current(oid, root._p_jar) + + def __set_keys_in_root_to(self, + storage, + new_data, + old_tids, + old_data, + pack=False): + """ + And return the transaction ID and current checkpoints. + + Uses an independent transaction. + + Closes *storage*. + """ + db1 = self._closing(DB(storage)) + tx = transaction.TransactionManager() + c1 = db1.open(tx) + # We've polled and gained checkpoints + self.assert_checkpoints(c1) + + root = c1.root() + self.__do_sets(root, new_data, old_tids, old_data) + tx.commit() + + checkpoints = self.assert_checkpoints(c1) + self.__do_check_tids(root, old_tids) + tid = bytes8_to_int64(c1._storage.lastTransaction()) + c1.close() + if pack: + storage.pack(time.time(), referencesf) + db1.close() + return tid, checkpoints + + def __set_key_in_root_to(self, + storage, + value, + key=ROOT_KEY, + old_tid=None, + old_value=None): + return self.__set_keys_in_root_to( + storage, + {key: value}, + {key: old_tid}, + {key: old_value} + ) + + def checkNoConflictWhenChangeMissedByPersistentCacheAfterCP0(self): + orig_tid, orig_checkpoints = self.__set_key_in_root_to(self._storage, 42) + + # A storage that will not update the persistent cache. + new_tid, _ = self.__set_key_in_root_to( + self.__make_storage_no_pcache(), + 420 + ) + self.assertGreater(new_tid, orig_tid) + + # Now a new storage that will read the persistent cache. + # It has the correct checkpoints, and the root oid is found in + # delta_after0, where we will poll for it. + storage3 = self.__make_storage_pcache( + orig_checkpoints, + expected_root_tid=orig_tid) + + db3 = self._closing(DB(storage3)) + c3 = db3.open() + + # Polling in the connection, however, caught us up, because the object changed + # after current_tid. + self.assert_oid_current(ROOT_OID, c3, new_tid) + + r = c3.root() + self.assertEqual(r.myobj, 420) + r.myobj = 180 + transaction.commit() + c3.close() + db3.close() + + def _populate_root_and_mapping(self): + tx1 = transaction.TransactionManager() + storage1 = self._storage + db1 = self._closing(DB(storage1)) + c1 = db1.open(tx1) + root = c1.root() + root.myobj1 = mapping = PersistentMapping() + root.myobj = 1 + tx1.commit() + c1._storage._cache.clear(load_persistent=False) + + c1._storage.poll_invalidations() + root.myobj = 2 + tx1.commit() + c1._storage._cache.clear(load_persistent=False) + + c1._storage.poll_invalidations() + root.myobj = 3 + tx1.commit() + root_tid = self.assert_oid_known(ROOT_OID, c1) + c1._storage._cache.clear(load_persistent=False) + + # Now, mutate an object that's not the root + # so that we get a new transaction after the root was + # modified. This transaction will be included in + # a persistent cache. + c1._storage.poll_invalidations() + root.myobj1.key = PersistentMapping() + mapping_oid = mapping._p_oid + mapping_oid_int = bytes8_to_int64(mapping_oid) + tx1.commit() + mapping_tid = self.assert_oid_known(mapping_oid_int, c1) + + self.assert_checkpoints(c1, (root_tid, root_tid)) + self.assert_oid_current(mapping_oid_int, c1) + + # the root is not in a delta + self.assert_oid_not_known(ROOT_OID, c1) + # Nor is it in the cache, because the Connection's + # object cache still had the root and we were never + # asked. + self.assert_oid_not_cached(ROOT_OID, c1) + # So lets get it in the cache with its current TID. + c1._storage.load(z64) + self.assert_cached_exact(ROOT_OID, root_tid, c1) + + c1.close() + return root_tid, mapping_tid, db1 + + def checkNoConflictWhenChangeMissedByPersistentCacheBeforeCP1(self): + _root_tid, _mapping_tid, db = self._populate_root_and_mapping() + + # Make some changes to the root in a storage that will not + # read or update the persistent cache. + new_tid, _ = self.__set_key_in_root_to( + self.__make_storage_no_pcache(), + 420, + old_tid=None, + ) + + # Now move the persistent checkpoints forward, pushing the + # last TID for the root object out of the delta ranges. + db.storage._cache.local_client.store_checkpoints(new_tid, new_tid) + # Persist + db.close() + + # Now a new storage that will read the persistent cache + # The root object, however, was not put into a delta map. + storage3 = self.__make_storage_pcache( + expected_checkpoints=(new_tid, new_tid), + expected_root_tid=None, + ) + # Nor is it in the cache at any key. + self.assert_oid_not_cached(ROOT_OID, storage3) + + # We can successfully open and edit the root object. + self.__set_key_in_root_to(storage3, 180, old_tid=new_tid, old_value=420) + + def checkNoConflictNoChangeInPersistentCacheBeforeCP1(self): + root_tid, _mapping_tid, db = self._populate_root_and_mapping() + + # Make some changes to the sub-object, and not the root, in a storage that will not + # read or update the persistent cache. + new_tid, _ = self.__set_key_in_root_to( + self.__make_storage_no_pcache(), + 420, + key='myobj1.key', + ) + + # Now move the persistent checkpoints forward, pushing the + # last TID for the root object out of the delta ranges. + db.storage._cache.local_client.store_checkpoints(new_tid, new_tid) + # persist + db.close() + + # Now a new storage that will read the persistent cache + storage = self.__make_storage_pcache( + # With checkpoints + expected_checkpoints=(new_tid, new_tid), + # No root TID in delta after 0 + expected_root_tid=None + ) + # But it is in the cache for its old key, because we verified it + # to still be in sync. + cache_data = self.assert_cached(ROOT_OID, new_tid, storage) + self.assertEqual(cache_data[1], root_tid) + + self.__set_keys_in_root_to( + storage, + { + 'myobj': 180, + 'myobj1.key': 360, + }, + old_data={ + 'myobj': 3, + 'myobj1.key': 420 + }, + old_tids={} + ) + + def checkNoConflictWhenDeletedNotInInPersistentCacheBeforeCP1(self): + root_tid, _mapping_tid, db = self._populate_root_and_mapping() + + # Now, remove a persistent object. We do this by setting its + # key to a new persistent object. + c1 = db.open() + root = c1.root() + new_nested_mapping = PersistentMapping() + root.myobj1.key = new_nested_mapping + + mapping_oid = root.myobj1._p_oid + mapping_oid_int = bytes8_to_int64(mapping_oid) + c1.add(new_nested_mapping) + nested_mapping_oid = new_nested_mapping._p_oid + nested_mapping_oid_int = bytes8_to_int64(nested_mapping_oid) + transaction.commit() + self.assert_oid_current(nested_mapping_oid_int, c1) + + self.assert_checkpoints(c1, (root_tid, root_tid)) + + # the root is not in a delta + self.assert_oid_not_known(ROOT_OID, c1) + + # Though it is in the cache. + self.assert_cached_exact(ROOT_OID, root_tid, c1) + + # Create a new transaction that deletes an object but + # that won't update the persistent cache. + new_tid, _ = self.__set_keys_in_root_to( + self.__make_storage_no_pcache(), + {'myobj1.key': None}, + {}, + {}, + pack=True + ) + + # Now move the persistent checkpoints forward, pushing the + # last TID for the root object out of the delta ranges. + c1._storage._cache.local_client.store_checkpoints(new_tid, new_tid) + # Persist + c1.close() + db.close() + del db, c1 + + # Now a new storage that will read the persistent cache + storage = self.__make_storage_pcache( + expected_checkpoints=(new_tid, new_tid), + ) + # The deleted object was not put in a delta map + self.assert_oid_not_known(nested_mapping_oid_int, storage) + # Nor is it in a cache at the old key + self.assert_oid_not_cached(nested_mapping_oid_int, storage) + + # Likewise, the parent mapping isn't found anywhere, because it + # changed + self.assert_oid_not_known(mapping_oid_int, storage) + self.assert_oid_not_cached(mapping_oid_int, storage) + + self.__set_keys_in_root_to( + storage, + {'myobj': 180, 'myobj1.key': 360}, + {'': root_tid}, + {'myobj': 3, 'myobj1.key': None} + ) diff --git a/src/relstorage/tests/reltestbase.py b/src/relstorage/tests/reltestbase.py index b208cc96..eb6fa0b9 100644 --- a/src/relstorage/tests/reltestbase.py +++ b/src/relstorage/tests/reltestbase.py @@ -16,7 +16,6 @@ # pylint:disable=too-many-ancestors,abstract-method,too-many-public-methods,too-many-lines # pylint:disable=too-many-statements,too-many-locals -import abc import os import random import shutil @@ -34,8 +33,6 @@ from ZODB.FileStorage import FileStorage from ZODB.POSException import ReadConflictError from ZODB.serialize import referencesf -from ZODB.utils import u64 -from ZODB.utils import z64 from ZODB.tests import BasicStorage from ZODB.tests import ConflictResolution from ZODB.tests import MTStorage @@ -47,114 +44,23 @@ from ZODB.tests.StorageTestBase import zodb_pickle from ZODB.tests.StorageTestBase import zodb_unpickle -from relstorage.options import Options -from relstorage.storage import RelStorage -from relstorage._compat import ABC - from . import fakecache from . import util from . import mock +from . import TestCase +from . import StorageCreatingMixin +from .persistentcache import PersistentCacheStorageTests from .test_zodbconvert import FSZODBConvertTests -class StorageCreatingMixin(ABC): - - keep_history = None # Override - driver_name = None # Override. - - @abc.abstractmethod - def make_adapter(self, options): - # abstract method - raise NotImplementedError - - @abc.abstractmethod - def get_adapter_class(self): - raise NotImplementedError - - @abc.abstractmethod - def get_adapter_zconfig(self): - """ - Return the part of the ZConfig string that makes the adapter. - - That is, return the , or section. - - Return text (unicode). - """ - raise NotImplementedError - - def get_adapter_zconfig_replica_conf(self): - return os.path.join(os.path.dirname(__file__), 'replicas.conf') - - @abc.abstractmethod - def verify_adapter_from_zconfig(self, adapter): - """ - Assert that the adapter configured from get_adapter_zconfig - is properly configured. - """ - raise NotImplementedError - - def _wrap_storage(self, storage): - return storage - - def make_storage(self, zap=True, **kw): - if ('cache_servers' not in kw - and 'cache_module_name' not in kw - and kw.get('share_local_cache', True)): - if util.CACHE_SERVERS and util.CACHE_MODULE_NAME: - kw['cache_servers'] = util.CACHE_SERVERS - kw['cache_module_name'] = util.CACHE_MODULE_NAME - if 'cache_prefix' not in kw: - kw['cache_prefix'] = type(self).__name__ + self._testMethodName - if 'cache_local_dir' not in kw: - # Always use a persistent cache. This helps discover errors in - # the persistent cache. - # These tests run in a temporary directory that gets cleaned up, so the CWD is - # appropriate. - kw['cache_local_dir'] = '.' - - assert self.driver_name - options = Options(keep_history=self.keep_history, driver=self.driver_name, **kw) - adapter = self.make_adapter(options) - storage = RelStorage(adapter, options=options) - storage._batcher_row_limit = 1 - if zap: - # XXX: Some ZODB tests, possibly check4ExtStorageThread - # and check7StorageThreads don't close storages when done - # with them? This leads to connections remaining open with - # locks on PyPy, so on PostgreSQL we can't TRUNCATE tables - # and have to go the slow route. - # - # As of 2019-06-20 with PyPy 7.1.1, I'm no longer able to replicate - # a problem like that locally, so we go back to the fast way. - storage.zap_all() - return self._wrap_storage(storage) - - class RelStorageTestBase(StorageCreatingMixin, + TestCase, StorageTestBase.StorageTestBase): base_dbname = None # Override keep_history = None # Override _storage_created = None - __to_close = () - - def setUp(self): - # This sets up a temporary directory for each test and - # changes to it. - super(RelStorageTestBase, self).setUp() - self.__to_close = [] - - def _closing(self, o): - """ - Close the object before tearDown (opposite of addCleanup - so that exceptions will propagate). - - Returns the given object. - """ - self.__to_close.append(o) - return o - def _close(self): # Override from StorageTestBase. @@ -169,13 +75,6 @@ def _close(self): storage.close() storage.cleanup() - def tearDown(self): - transaction.abort() - for x in reversed(self.__to_close): - x.close() - self.__to_close = () - super(RelStorageTestBase, self).tearDown() - def make_storage_to_cache(self): return self.make_storage() @@ -210,6 +109,7 @@ def open(self, read_only=False): class GenericRelStorageTests( RelStorageTestBase, + PersistentCacheStorageTests, BasicStorage.BasicStorage, PackableStorage.PackableStorage, Synchronization.SynchronizedStorage, @@ -219,434 +119,6 @@ class GenericRelStorageTests( ReadOnlyStorage.ReadOnlyStorage, ): - def checkNoConflictWhenChangeMissedByPersistentCacheAfterCP0(self): - storage1 = self._storage - db1 = self._closing(DB(storage1)) - c1 = db1.open() - c1.root()['myobj'] = 42 - transaction.commit() - orig_checkpoints = c1._storage._cache.checkpoints - self.assertIsNotNone(orig_checkpoints) - # We have a saved TID for the root object. - self.assertIn(0, c1._storage._cache.delta_after0) - tid = c1._storage._cache.delta_after0[0] - __traceback_info__ = tid - self.assertEqual(c1._storage._cache.current_tid, tid) - c1.close() - db1.close() - del db1, c1, storage1 - - # A storage that will not update the persistent cache. - storage2 = self.make_storage(cache_local_dir=None, zap=False) - # It didn't read checkpoints or TID - self.assertIsNone(storage2._cache.checkpoints) - self.assertIsNone(storage2._cache.current_tid) - db2 = self._closing(DB(storage2)) - c2 = db2.open() - # We've polled and gained checkpoints - self.assertIsNotNone(c2._storage._cache.checkpoints) - c2.root()['myobj'] = 420 - transaction.commit() - # The tid changed - self.assertIn(0, c2._storage._cache.delta_after0) - new_tid = c2._storage._cache.delta_after0[0] - self.assertGreater(new_tid, tid) - c2.close() - db2.close() - del db2, storage2, c2 - - # Now a new storage that will read the persistent cache - storage3 = self.make_storage(zap=False) - # It did read checkpoints and TID - self.assertIsNotNone(storage3._cache.checkpoints) - self.assertEqual(storage3._cache.checkpoints, orig_checkpoints) - self.assertIn(0, storage3._cache.delta_after0) - self.assertEqual(storage3._cache.delta_after0[0], tid) - self.assertIsNotNone(storage3._cache.current_tid) - db3 = self._closing(DB(storage3)) - c3 = db3.open() - # Polling, however, caught us up, because the object changed - # after current_tid. - self.assertIn(0, c3._storage._cache.delta_after0) - self.assertEqual(c3._storage._cache.delta_after0[0], new_tid) - - r = c3.root() - self.assertEqual(r['myobj'], 420) - c3.root()['myobj'] = 180 - transaction.commit() - c3.close() - db3.close() - - def checkNoConflictWhenChangeMissedByPersistentCacheBeforeCP1(self): - tx1 = transaction.TransactionManager() - storage1 = self._storage - db1 = self._closing(DB(storage1)) - c1 = db1.open(tx1) - c1.root()['myobj1'] = mapping = PersistentMapping() - c1.root()['myobj'] = 1 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 2 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 3 - tx1.commit() - tid3 = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - # Now, mutate an object that's not the root - # so that we get a new transaction after the root was - # modified. - c1._storage.poll_invalidations() - c1.root()['myobj1']['key'] = 1 - mapping_oid = mapping._p_oid - mapping_oid_int = u64(mapping_oid) - tx1.commit() - tid4 = c1._storage._cache.delta_after0[mapping_oid_int] - - orig_checkpoints = c1._storage._cache.checkpoints - self.assertIsNotNone(orig_checkpoints) - self.assertEqual(orig_checkpoints, (tid3, tid3)) - self.assertEqual(c1._storage._cache.current_tid, tid4) - - # the root is not in a delta - self.assertNotIn(0, c1._storage._cache.delta_after0) - # Nor is it in the cache, because the Connection's - # object cache still had the root and we were never - # asked. - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNone(cache_data) - # So lets get it in the cache with its current TID. - c1._storage.load(z64) - - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNotNone(cache_data) - self.assertEqual(cache_data[1], tid3) - - # Make some changes to the root in a storage that will not - # update the persistent cache. - storage2 = self.make_storage(cache_local_dir=None, zap=False) - # It didn't read checkpoints or TID - self.assertIsNone(storage2._cache.checkpoints) - self.assertIsNone(storage2._cache.current_tid) - db2 = self._closing(DB(storage2)) - tx2 = transaction.TransactionManager() - c2 = db2.open(tx2) - # We've polled and gained checkpoints - self.assertIsNotNone(c2._storage._cache.checkpoints) - c2.root()['myobj'] = 420 - tx2.commit() - # The tid changed - self.assertIn(0, c2._storage._cache.delta_after0) - new_tid = c2._storage._cache.delta_after0[0] - self.assertGreater(new_tid, tid4) - c2.close() - db2.close() - del db2, storage2, c2 - - # Now move the persistent checkpoints forward, pushing the - # last TID for the root object out of the delta ranges. - c1._storage._cache.local_client.store_checkpoints(new_tid, new_tid) - - c1.close() - db1.close() - del db1, c1, storage1 - - # Now a new storage that will read the persistent cache - storage3 = self.make_storage(zap=False) - # It did read checkpoints and TID - self.assertIsNotNone(storage3._cache.checkpoints) - self.assertEqual(storage3._cache.checkpoints, (new_tid, new_tid)) - self.assertEqual(storage3._cache.current_tid, new_tid) - # The root object, however, was not put into a delta map. - self.assertNotIn(0, storage3._cache.delta_after0) - # Nor is it in the cache at any key. - keys_for_root = [k for k in storage3._cache.local_client if k[0] == 0] - self.assertEqual(0, len(keys_for_root)) - db3 = self._closing(DB(storage3)) - tx3 = transaction.TransactionManager() - c3 = db3.open(tx3) - # Polling did not find the change. We think we're current with new_tid, - # and the root changed in that transaction. - # XXX: MySQL on Travis, but only there, not on appveyor and not locally, - # has a 0 in here. Why? - # self.assertNotIn(0, c3._storage._cache.delta_after0) - - # Opening the database loaded the root object, so it's now in the cache, - # with accurate data. - cache_data = c3._storage._cache.local_client[(0, new_tid)] - self.assertIsNotNone(cache_data) - __traceback_info__ = tid3, tid4, new_tid - self.assertEqual(cache_data[1], new_tid) - r = c3.root() - # The current data is visible. - self.assertEqual(r['myobj'], 420) - c3.root()['myobj'] = 180 - tx3.commit() - c3.close() - db3.close() - - def checkNoConflictNoChangeInPersistentCacheBeforeCP1(self): - tx1 = transaction.TransactionManager() - storage1 = self._storage - db1 = self._closing(DB(storage1)) - c1 = db1.open(tx1) - c1.root()['myobj1'] = mapping = PersistentMapping() - c1.root()['myobj'] = 1 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 2 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 3 - tx1.commit() - last_root_tid_change = tid3 = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - # Now, mutate an object that's not the root - # so that we get a new transaction after the root was - # modified. - c1._storage.poll_invalidations() - c1.root()['myobj1']['key'] = 1 - mapping_oid = mapping._p_oid - mapping_oid_int = u64(mapping_oid) - tx1.commit() - tid4 = c1._storage._cache.delta_after0[mapping_oid_int] - - orig_checkpoints = c1._storage._cache.checkpoints - self.assertIsNotNone(orig_checkpoints) - self.assertEqual(orig_checkpoints, (tid3, tid3)) - self.assertEqual(c1._storage._cache.current_tid, tid4) - - # the root is not in a delta - self.assertNotIn(0, c1._storage._cache.delta_after0) - # Nor is it in the cache, because the Connection's - # object cache still had the root and we were never - # asked. - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNone(cache_data) - # So lets get it in the cache with its current TID. - c1._storage.load(z64) - - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNotNone(cache_data) - self.assertEqual(cache_data[1], tid3) - - # Create a new transaction that doesn't change the root in a storage - # that won't update the persistent cache. - storage2 = self.make_storage(cache_local_dir=None, zap=False) - # It didn't read checkpoints or TID - self.assertIsNone(storage2._cache.checkpoints) - self.assertIsNone(storage2._cache.current_tid) - db2 = self._closing(DB(storage2)) - tx2 = transaction.TransactionManager() - c2 = db2.open(tx2) - # We've polled and gained checkpoints - self.assertIsNotNone(c2._storage._cache.checkpoints) - c2.root()['myobj1']['key'] = 420 - tx2.commit() - # The tid changed - self.assertIn(mapping_oid_int, c2._storage._cache.delta_after0) - new_tid = c2._storage._cache.delta_after0[mapping_oid_int] - self.assertGreater(new_tid, tid4) - c2.close() - db2.close() - del db2, storage2, c2 - - # Now move the persistent checkpoints forward, pushing the - # last TID for the root object out of the delta ranges. - c1._storage._cache.local_client.store_checkpoints(new_tid, new_tid) - - c1.close() - db1.close() - del db1, c1, storage1 - - # Now a new storage that will read the persistent cache - storage3 = self.make_storage(zap=False) - # It did read checkpoints and TID - self.assertIsNotNone(storage3._cache.checkpoints) - self.assertEqual(storage3._cache.checkpoints, (new_tid, new_tid)) - self.assertEqual(storage3._cache.current_tid, new_tid) - # The root object, however, was not put into a delta map. - self.assertNotIn(0, storage3._cache.delta_after0) - # But it is in the cache for its old key, because we verified it - # to still be in sync. - keys_for_root = [k for k in storage3._cache.local_client if k[0] == 0] - self.assertEqual(1, len(keys_for_root)) - self.assertEqual((0, new_tid), keys_for_root[0]) - db3 = self._closing(DB(storage3)) - tx3 = transaction.TransactionManager() - c3 = db3.open(tx3) - - # Opening the database loaded the root object, and it continues - # to be found at its old cache key. - cache_data = c3._storage._cache.local_client[(0, new_tid)] - self.assertIsNotNone(cache_data) - __traceback_info__ = tid3, tid4, new_tid - self.assertEqual(cache_data[1], last_root_tid_change) - r = c3.root() - # The current data is visible. - self.assertEqual(r['myobj'], 3) - self.assertEqual(r['myobj1']['key'], 420) - c3.root()['myobj'] = 180 - r['myobj1']['key'] = 360 - tx3.commit() - c3.close() - db3.close() - - def checkNoConflictWhenDeletedNotInInPersistentCacheBeforeCP1(self): - tx1 = transaction.TransactionManager() - storage1 = self._storage - db1 = self._closing(DB(storage1)) - c1 = db1.open(tx1) - c1.root()['myobj1'] = mapping = PersistentMapping() - mapping['key'] = PersistentMapping() - c1.root()['myobj'] = 1 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 2 - tx1.commit() - _ = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - c1._storage.poll_invalidations() - c1.root()['myobj'] = 3 - tx1.commit() - last_root_tid_change = tid3 = c1._storage._cache.delta_after0[0] - c1._storage._cache.clear(load_persistent=False) - - # Now, mutate an object that's not the root - # so that we get a new transaction after the root was - # modified. - c1._storage.poll_invalidations() - nested_mapping = c1.root()['myobj1']['key'] = PersistentMapping() - mapping_oid = mapping._p_oid - mapping_oid_int = u64(mapping_oid) - c1.add(nested_mapping) - nested_mapping_oid = nested_mapping._p_oid - nested_mapping_oid_int = u64(nested_mapping_oid) - tx1.commit() - tid4 = c1._storage._cache.delta_after0[mapping_oid_int] - - orig_checkpoints = c1._storage._cache.checkpoints - self.assertIsNotNone(orig_checkpoints) - self.assertEqual(orig_checkpoints, (tid3, tid3)) - self.assertEqual(c1._storage._cache.current_tid, tid4) - - # the root is not in a delta - self.assertNotIn(0, c1._storage._cache.delta_after0) - # Nor is it in the cache, because the Connection's - # object cache still had the root and we were never - # asked. - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNone(cache_data) - # So lets get it in the cache with its current TID. - c1._storage.load(z64) - - cache_data = c1._storage._cache.local_client[(0, tid3)] - __traceback_info__ = list(c1._storage._cache.local_client) - self.assertIsNotNone(cache_data) - self.assertEqual(cache_data[1], tid3) - - # Create a new transaction that deletes an object but - # that won't update the persistent cache. - storage2 = self.make_storage(cache_local_dir=None, zap=False) - # It didn't read checkpoints or TID - self.assertIsNone(storage2._cache.checkpoints) - self.assertIsNone(storage2._cache.current_tid) - db2 = self._closing(DB(storage2)) - tx2 = transaction.TransactionManager() - c2 = db2.open(tx2) - # We've polled and gained checkpoints - self.assertIsNotNone(c2._storage._cache.checkpoints) - del c2.root()['myobj1']['key'] - tx2.commit() - # The tid changed - self.assertIn(mapping_oid_int, c2._storage._cache.delta_after0) - new_tid = c2._storage._cache.delta_after0[mapping_oid_int] - self.assertGreater(new_tid, tid4) - c2.close() - db2.close() - # In order to actually remove this object, we must pack the storage. - t = time.time() - storage2.pack(t, ZODB.serialize.referencesf) - - del db2, storage2, c2 - - # Now move the persistent checkpoints forward, pushing the - # last TID for the root object out of the delta ranges. - c1._storage._cache.local_client.store_checkpoints(new_tid, new_tid) - - c1.close() - db1.close() - del db1, c1, storage1 - - # Now a new storage that will read the persistent cache - storage3 = self.make_storage(zap=False) - # It did read checkpoints and TID - self.assertIsNotNone(storage3._cache.checkpoints) - self.assertEqual(storage3._cache.checkpoints, (new_tid, new_tid)) - self.assertEqual(storage3._cache.current_tid, new_tid) - # The deleted object was not put in a delta map - self.assertNotIn(nested_mapping_oid_int, storage3._cache.delta_after0) - self.assertNotIn(nested_mapping_oid_int, storage3._cache.delta_after1) - # Nor is it in a cache at the old key - keys_for_nested = [k for k in storage3._cache.local_client - if k[0] == nested_mapping_oid_int] - __traceback_info__ = nested_mapping_oid_int, new_tid, keys_for_nested - self.assertEqual(0, len(keys_for_nested)) - - # Likewise, the parent mapping isn't found anywhere, because it - # changed - self.assertNotIn(mapping_oid_int, storage3._cache.delta_after0) - self.assertNotIn(mapping_oid_int, storage3._cache.delta_after1) - # Nor is it in a cache at the old key - keys_for_mapping = [k for k in storage3._cache.local_client - if k[0] == mapping_oid_int] - self.assertEqual(0, len(keys_for_mapping)) - - db3 = self._closing(DB(storage3)) - tx3 = transaction.TransactionManager() - c3 = db3.open(tx3) - - # Opening the database loaded the root object, and it continues - # to be found at its old cache key. - cache_data = c3._storage._cache.local_client[(0, new_tid)] - self.assertIsNotNone(cache_data) - __traceback_info__ = tid3, tid4, new_tid - self.assertEqual(cache_data[1], last_root_tid_change) - r = c3.root() - # The current data is visible. - self.assertEqual(r['myobj'], 3) - self.assertNotIn('key', r['myobj1']) - c3.root()['myobj'] = 180 - r['myobj1']['key'] = 360 - tx3.commit() - # The parent mapping is now in the cache - self.assertIn(mapping_oid_int, c3._storage._cache.delta_after0) - c3.close() - db3.close() - def checkCurrentObjectTidsRoot(self): # Get the root object in place db = self._closing(DB(self._storage)) @@ -1339,19 +811,14 @@ def checkBTreesLengthStress(self): c.close() def updater(): - thread_storage = self._storage.new_instance() - thread_db = DB(thread_storage) - try: - for _ in range(updates_per_thread): - thread_c = thread_db.open() - try: - thread_c.root()['length'].change(1) - time.sleep(random.random() * 0.05) - transaction.commit() - finally: - thread_c.close() - finally: - thread_storage.close() + for _ in range(updates_per_thread): + thread_c = db.open() + try: + thread_c.root()['length'].change(1) + time.sleep(random.random() * 0.05) + transaction.commit() + finally: + thread_c.close() import threading threads = []