/
cache.py
158 lines (125 loc) · 4.71 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from Acquisition import aq_base
from Acquisition import aq_parent
from DateTime import DateTime
from plone.memoize.volatile import DontCache
from Products.PluginIndexes.interfaces import IIndexCounter
from Products.PluginIndexes.interfaces import IDateRangeIndex
from Products.PluginIndexes.interfaces import IDateIndex
from plone.memoize import ram
class CatalogCacheKey(object):
def __init__(self, catalog, query=None):
self.catalog = catalog
self.cid = self.get_id()
self.query = query
self.key = self.make_key(query)
def get_id(self):
catalog = self.catalog
parent = aq_parent(catalog)
path = getattr(aq_base(parent), 'getPhysicalPath', None)
if path is None:
path = ('', 'NonPersistentCatalog')
else:
path = tuple(parent.getPhysicalPath())
return path
def make_key(self, query):
if query is None:
return None
catalog = self.catalog
def skip(name, value):
if name in ['b_start', 'b_size']:
return True
elif catalog._get_sort_attr('on', {name: value}):
return True
elif catalog._get_sort_attr('limit', {name: value}):
return True
elif catalog._get_sort_attr('order', {name: value}):
return True
return False
keys = []
for name, value in query.items():
if name in catalog.indexes:
index = catalog.getIndex(name)
if IIndexCounter.providedBy(index):
counter = index.getCounter()
else:
# cache key invalidation cannot be supported if
# any index of query cannot be tested for changes
return None
elif skip(name, value):
# applying the query to indexes is invariant of
# sort or pagination options
continue
else:
# return None if query has a nonexistent index key
return None
if isinstance(value, dict):
kvl = []
for k, v in value.items():
v = self._convert_datum(index, v)
kvl.append((k, v))
value = frozenset(kvl)
else:
value = self._convert_datum(index, value)
keys.append((name, value, counter))
key = frozenset(keys)
cache_key = (self.cid, key)
return cache_key
def _convert_datum(self, index, value):
def convert_datetime(dt):
if IDateRangeIndex.providedBy(index):
term = index._convertDateTime(dt)
elif IDateIndex.providedBy(index):
term = index._convert(dt)
else:
term = value
return term
if isinstance(value, (list, tuple)):
res = []
for v in value:
if isinstance(v, DateTime):
v = convert_datetime(v)
res.append(v)
res.sort()
elif isinstance(value, DateTime):
res = convert_datetime(value)
else:
res = (value,)
if not isinstance(res, tuple):
res = tuple(res)
return res
# plone memoize cache key
def _apply_query_plan_cachekey(method, catalog, plan, query):
cc = CatalogCacheKey(catalog, query)
if cc.key is None:
raise DontCache
return cc.key
def cache(fun):
@ram.cache(_apply_query_plan_cachekey)
def decorator(*args, **kwargs):
return fun(*args, **kwargs)
return decorator
# Make sure we provide test isolation, works only for ramcache
def _get_cache():
cache_adapter = ram.store_in_cache(_apply_query_plan_cachekey)
if hasattr(cache_adapter, 'ramcache'):
return cache_adapter.ramcache
else:
raise AttributeError('Only ramcache supported for testing')
def _cache_clear():
ram_cache = _get_cache()
ram_cache.invalidateAll()
from zope.testing.cleanup import addCleanUp # NOQA
addCleanUp(_cache_clear)
del addCleanUp