Skip to content

Commit

Permalink
Use volatile attribute of instance for buffering cache values
Browse files Browse the repository at this point in the history
  • Loading branch information
andbag committed Jul 9, 2018
1 parent 76f6c59 commit 8eb64f2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
36 changes: 18 additions & 18 deletions src/Products/ZCatalog/cache.py
Expand Up @@ -25,8 +25,8 @@

from ZODB.utils import p64, z64
from ZODB.POSException import ReadConflictError
from functools import wraps
from six.moves._thread import allocate_lock
from six.moves._thread import _local

LOG = logging.getLogger('Zope.ZCatalog.cache')
_marker = (z64, z64)
Expand Down Expand Up @@ -67,6 +67,13 @@ def make_key(self, query):

catalog = self.catalog

# check high-water mark of catalog's zodb connection
try:
zodb_storage = catalog._p_jar._storage
zodb_storage._start
except AttributeError:
raise DontCache

def skip(name, value):
if name in ['b_start', 'b_size']:
return True
Expand All @@ -84,7 +91,7 @@ def skip(name, value):
index = catalog.getIndex(name)
if IIndexCounter.providedBy(index):
if (
not index._p_jar or catalog._p_jar._storage is not
not index._p_jar or zodb_storage is not
index._p_jar._storage
):
# paranoid check; if the catalog and the indexes
Expand Down Expand Up @@ -160,26 +167,20 @@ def __init__(self, cache=dict()):

def __call__(self, func):

@wraps(func)
def decorated(catalog, plan, query):
try:
query_key = CatalogQueryKey(catalog, query).key
except DontCache:
return func(catalog, plan, query)

try:
# check high-water mark of catalog's zodb connection
zodb_storage = catalog._p_jar._storage
zodb_storage._start
except AttributeError:
return func(catalog, plan, query)

key = '{0}.{1}:{2}'.format(func.__module__,
func.__name__, query_key)

# convert key to 64 bit hash (not really required)
oid = p64(hash(key) & ((1 << 64) - 1))

tca = TransactionalCacheAdapter(zodb_storage, self.cache)
tca = TransactionalCacheAdapter(catalog, self.cache)

try:
value = tca[oid]
Expand All @@ -199,25 +200,24 @@ def decorated(catalog, plan, query):
class TransactionalCacheAdapter(object):
""" """
lock = allocate_lock()
thread_local = _local()

def __init__(self, ref_storage, cache):
def __init__(self, instance, cache):

self.cache_adapter = cache

# get thread isolated local buffer/cache
buffer_id = '_v_{0}_buffer'.format(self.__class__.__name__)
try:
self._cache = getattr(self.thread_local, buffer_id)
self._cache = getattr(instance, buffer_id)
except AttributeError:
setattr(self.thread_local, buffer_id, {})
self._cache = getattr(self.thread_local, buffer_id)
setattr(instance, buffer_id, {})
self._cache = getattr(instance, buffer_id)

# commit buffer
self._uncommitted = {}

self._ref_storage = ref_storage
self._start = ref_storage._start
self._zodb_storage = instance._p_jar._storage
self._start = self._zodb_storage._start
self._tid = z64
self._registered = False

Expand Down Expand Up @@ -291,7 +291,7 @@ def _prepare(self):
raise ReadConflictError

# get lastTransaction of catalog's zodb connection
self._tid = self._ref_storage.lastTransaction()
self._tid = self._zodb_storage.lastTransaction()

def _commit(self):
"""
Expand Down
49 changes: 40 additions & 9 deletions src/Products/ZCatalog/tests/test_cache.py
Expand Up @@ -215,7 +215,7 @@ def test_cache_invalidate(self):
transaction.get().commit()

stats = cache.getStatistics()

hits = stats[0]['hits']
misses = stats[0]['misses']

Expand All @@ -227,7 +227,7 @@ def test_cache_invalidate(self):
transaction.get().commit()

stats = cache.getStatistics()

# check if cache misses (__getitem__ + commit)
self.assertEqual(stats[0]['hits'], hits)
self.assertEqual(stats[0]['misses'], misses + 2)
Expand All @@ -238,6 +238,9 @@ def test_cache_invalidate(self):
self.assertEqual(rset1, rset2)

def test_cache_mvcc_concurrent_writes(self):
# remember: a search result corresponds to two key
# value pairs in the cache

st = MinimalMemoryStorage()
db = DB(st)

Expand Down Expand Up @@ -278,10 +281,12 @@ def test_cache_mvcc_concurrent_writes(self):
r2['zcat'].catalog_object(obj, obj.id)

res2 = r2['zcat'].search(query)
# cache miss
indexed_ids = {rec.id for rec in res2}
self.assertTrue(obj.id in indexed_ids)

# raise conflict error because catalog was changed in tm1
# don't prepare set for after commit
self.assertRaises(ConflictError, tm2.get().commit)

tm2.get().abort()
Expand All @@ -291,7 +296,9 @@ def test_cache_mvcc_concurrent_writes(self):
obj = Dummy(22)
r2['zcat'].catalog_object(obj, obj.id)

# cache miss and prepare set for after commit
res2 = r2['zcat'].search(query)

indexed_ids = {rec.id for rec in res2}
self.assertTrue(obj.id in indexed_ids)

Expand All @@ -307,25 +314,49 @@ def test_cache_mvcc_concurrent_writes(self):
{('big', (True,), 12)})))
self.assertEqual(qkey, expect)

# cache store

# cache miss and prepare set for after commit
res1 = r1['zcat'].search(query)

# cache store
transaction.get().commit()


stats = cache.getStatistics()
hits = stats[0]['hits']
misses = stats[0]['misses']
self.assertEqual((hits, misses), (0, 5))

r2 = cn2.root()
# cache hit

# cache hit
res2 = r2['zcat'].search(query)
transaction.get().commit()

# compare result
rset1 = list(map(lambda x: x.getRID(), res1))
rset2 = list(map(lambda x: x.getRID(), res2))
self.assertEqual(rset1, rset2)
cache = _get_cache()

stats = cache.getStatistics()

hits = stats[0]['hits']
misses = stats[0]['misses']
self.assertEqual((hits, misses), (2, 4))
self.assertEqual((hits, misses), (2, 5))

# check usage instance cache
r2 = cn2.root()
# cache hit
res21 = r2['zcat'].search(query)
# instance cache hit
res22 = r2['zcat'].search(query)

# compare result
rset21 = list(map(lambda x: x.getRID(), res21))
rset22 = list(map(lambda x: x.getRID(), res22))
self.assertEqual(rset21, rset22)

stats = cache.getStatistics()
hits = stats[0]['hits']
misses = stats[0]['misses']
self.assertEqual((hits, misses), (4, 5))

transaction.get().commit()

0 comments on commit 8eb64f2

Please sign in to comment.