Skip to content

Commit

Permalink
Fixed the "potential ZODB cache inconsistency after client reconnect"
Browse files Browse the repository at this point in the history
problem reported on zodb-dev:

http://mail.zope.org/pipermail/zodb-dev/2006-August/010343.html

Added a new invalidateCache protocol for DBs and Connections to
invalidate the entire in-memory caches.  This is used when ZEO clients
reconnect.
  • Loading branch information
Jim Fulton committed Aug 16, 2006
1 parent 084d981 commit 4413420
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 29 deletions.
4 changes: 4 additions & 0 deletions src/ZEO/ClientStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ def notifyConnected(self, conn):
# this method before it was stopped.
return

# invalidate our db cache
if self._db is not None:
self._db.invalidateCache()

# TODO: report whether we get a read-only connection.
if self._connection is not None:
reconnect = 1
Expand Down
3 changes: 3 additions & 0 deletions src/ZEO/tests/ConnectionTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class DummyDB:
def invalidate(self, *args, **kwargs):
pass

def invalidateCache(self):
pass

class CommonSetupTearDown(StorageTestBase):
"""Common boilerplate"""

Expand Down
48 changes: 48 additions & 0 deletions src/ZEO/tests/testZEO.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,53 @@ def writable(self):
self.assert_('exc_info' in log[0][1])
self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
self.assert_('exc_info' in log[1][1])

class ConnectionInvalidationOnReconnect(
ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Test what happens when the client loop falls over
"""

def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""

def checkConnectionInvalidationOnReconnect(self):

storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1)
self._storage = storage

# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")

class DummyDB:
_invalidatedCache = 0
def invalidateCache(self):
self._invalidatedCache += 1
def invalidate(*a, **k):
pass

db = DummyDB()
storage.registerDB(db, None)

base = db._invalidatedCache

# Now we'll force a disconnection and reconnection
storage._connection.close()

# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")

# Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1)


class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
Expand Down Expand Up @@ -345,6 +392,7 @@ def tearDown(self):
DemoStorageWrappedAroundClientStorage,
HeartbeatTests,
CatastrophicClientLoopFailure,
ConnectionInvalidationOnReconnect,
]

def test_suite():
Expand Down
22 changes: 22 additions & 0 deletions src/ZODB/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def __init__(self, db, version='', cache_size=400):
self._inv_lock = threading.Lock()
self._invalidated = {}

# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False

# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
# experienced ReadConflictError. Any time we raise ReadConflictError,
Expand Down Expand Up @@ -311,6 +314,14 @@ def invalidate(self, tid, oids):
finally:
self._inv_lock.release()

def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()


def root(self):
"""Return the database root object."""
return self.get(z64)
Expand Down Expand Up @@ -473,6 +484,9 @@ def _flush_invalidations(self):
invalidated = self._invalidated
self._invalidated = {}
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()

Expand Down Expand Up @@ -524,6 +538,9 @@ def _commit(self, transaction):

self._added_during_commit = []

if self._invalidatedCache:
raise ConflictError()

for obj in self._registered_objects:
oid = obj._p_oid
assert oid
Expand Down Expand Up @@ -782,6 +799,10 @@ def _setstate(self, obj):
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.


if self._invalidatedCache:
raise ReadConflictError()

if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
Expand Down Expand Up @@ -970,6 +991,7 @@ def _resetCache(self):
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)

Expand Down
6 changes: 6 additions & 0 deletions src/ZODB/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,12 @@ def inval(c):
c.invalidate(tid, oids)
self._connectionMap(inval)

def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._miv_cache.clear()
self._connectionMap(lambda c: c.invalidateCache())

def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
Expand Down
73 changes: 45 additions & 28 deletions src/ZODB/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,40 +283,51 @@ def getTransferCounts(clear=False):
If clear is True, reset the counters.
"""

def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""

class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""

def __init__(storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
):
"""Create an object database.
storage: the storage used by the database, e.g. FileStorage
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of
objects
version_pool_size: expected maximum number of connections (per
version)
version_cache_size: target size of Connection object cache for
version connections, in number of objects
database_name: when using a multi-database, the name of this DB
within the database group. It's a (detected) error if databases
is specified too and database_name is already a key in it.
This becomes the value of the DB's database_name attribute.
databases: when using a multi-database, a mapping to use as the
binding of this DB's .databases attribute. It's intended
that the second and following DB's added to a multi-database
pass the .databases attribute set on the first DB added to the
collection.
"""
## __init__ methods don't belong in interfaces:
##
## def __init__(storage,
## pool_size=7,
## cache_size=400,
## version_pool_size=3,
## version_cache_size=100,
## database_name='unnamed',
## databases=None,
## ):
## """Create an object database.

## storage: the storage used by the database, e.g. FileStorage
## pool_size: expected maximum number of open connections
## cache_size: target size of Connection object cache, in number of
## objects
## version_pool_size: expected maximum number of connections (per
## version)
## version_cache_size: target size of Connection object cache for
## version connections, in number of objects
## database_name: when using a multi-database, the name of this DB
## within the database group. It's a (detected) error if databases
## is specified too and database_name is already a key in it.
## This becomes the value of the DB's database_name attribute.
## databases: when using a multi-database, a mapping to use as the
## binding of this DB's .databases attribute. It's intended
## that the second and following DB's added to a multi-database
## pass the .databases attribute set on the first DB added to the
## collection.
## """

databases = Attribute("""\
A mapping from database name to DB (database) object.
Expand All @@ -328,6 +339,12 @@ def __init__(storage,
entry.
""")

def invalidateCache():
"""Invalidate all objects in the database object caches
invalidateCache will be called on each of the database's connections.
"""

class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
Expand Down
74 changes: 74 additions & 0 deletions src/ZODB/tests/testConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,80 @@ def test_invalidate(self):
"""

def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection's cache. It also
prevents reads until the end of a transaction.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject()
>>> connection.root()['a'].x = 1
>>> connection.root()['b'] = StubObject()
>>> connection.root()['b'].x = 1
>>> connection.root()['c'] = StubObject()
>>> connection.root()['c'].x = 1
>>> tm.commit()
>>> connection.root()['b']._p_deactivate()
>>> connection.root()['c'].x = 2
So we have a connection and an active transaction with some
modifications. Lets call invalidateCache:
>>> connection.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> tm.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error
and the cache will have been cleared:
>>> print connection.root()['a']._p_changed
None
>>> print connection.root()['b']._p_changed
None
>>> print connection.root()['c']._p_changed
None
But we'll be able to access data again:
>>> connection.root()['b'].x
1
Aborting a transaction after a read conflict also lets us read data
and go on about our business:
>>> connection.invalidateCache()
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> tm.abort()
>>> connection.root()['c'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close()
"""

# ---- stubs

class StubObject(Persistent):
Expand Down
48 changes: 47 additions & 1 deletion src/ZODB/tests/testDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import transaction

from zope.testing import doctest

import ZODB
import ZODB.FileStorage

Expand Down Expand Up @@ -130,5 +132,49 @@ def test_removeVersionPool_while_connection_open(self):
self.assertEqual(nconn(pools), 3)


def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection caches for all of the connections attached to a database.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1)
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate()
>>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value
1
>>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print c3.root()['a']._p_changed
None
>>> db.close()
"""



def test_suite():
return unittest.makeSuite(DBTests)
s = unittest.makeSuite(DBTests)
s.addTest(doctest.DocTestSuite())
return s

0 comments on commit 4413420

Please sign in to comment.