-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
cache.py
136 lines (108 loc) · 5.19 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import pickle
import tempfile
from typing import Optional, Iterator
from jina.executors.indexers import BaseKVIndexer
DATA_FIELD = 'data'
ID_KEY = 'id'
CONTENT_HASH_KEY = 'content_hash'
# noinspection PyUnreachableCode
if False:
from jina.types.document import UniqueId
class BaseCache(BaseKVIndexer):
"""Base class of the cache inherited :class:`BaseKVIndexer`
The difference between a cache and a :class:`BaseKVIndexer` is the ``handler_mutex`` is released in cache, this allows one to query-while-indexing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def post_init(self):
self.handler_mutex = False #: for Cache we need to release the handler mutex to allow RW at the same time
class DocIDCache(BaseCache):
"""A key-value indexer that specializes in caching.
Serializes the cache to two files, one for ids, one for the actually cached field.
If field=`id`, then the second file is redundant. The class optimizes the process
so that there are no duplicates.
"""
class CacheHandler:
def __init__(self, path, logger):
self.path = path
try:
# TODO maybe mmap?
self.ids = pickle.load(open(path + '.ids', 'rb'))
self.content_hash = pickle.load(open(path + '.cache', 'rb'))
except FileNotFoundError as e:
logger.warning(
f'File path did not exist : {path}.ids or {path}.cache: {e!r}. Creating new CacheHandler...')
self.ids = []
self.content_hash = []
def close(self):
pickle.dump(self.ids, open(self.path + '.ids', 'wb'))
pickle.dump(self.content_hash, open(self.path + '.cache', 'wb'))
supported_fields = [ID_KEY, CONTENT_HASH_KEY]
default_field = ID_KEY
def __init__(self, index_filename: str = None, *args, **kwargs):
""" creates a new DocIDCache
:param field: to be passed as kwarg. This dictates by which Document field we cache (either `id` or `content_hash`)
"""
if not index_filename:
# create a new temp file if not exist
index_filename = tempfile.NamedTemporaryFile(delete=False).name
super().__init__(index_filename, *args, **kwargs)
self.field = kwargs.get('field', self.default_field)
if self.field not in self.supported_fields:
raise ValueError(f"Field '{self.field}' not in supported list of {self.supported_fields}")
def add(self, doc_id: 'UniqueId', *args, **kwargs):
self.query_handler.ids.append(doc_id)
# optimization. don't duplicate ids
if self.field != ID_KEY:
data = kwargs.get(DATA_FIELD, None)
if data is None:
raise ValueError(f'Got None from CacheDriver')
self.query_handler.content_hash.append(data)
self._size += 1
def query(self, data, *args, **kwargs) -> Optional[bool]:
"""
Check whether the data exists in the cache
:param data: either the id or the content_hash of a Document
"""
# FIXME this shouldn't happen
if self.query_handler is None:
self.query_handler = self.get_query_handler()
if self.field == ID_KEY:
status = (data in self.query_handler.ids) or None
else:
status = (data in self.query_handler.content_hash) or None
return status
def update(self, keys: Iterator['UniqueId'], values: Iterator[any], *args, **kwargs):
"""
:param keys: list of Document.id
:param values: list of either `id` or `content_hash` of :class:`Document"""
# if we don't cache anything else, no need
if self.field != ID_KEY:
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.ids, self.save_abspath)
for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
# optimization. don't duplicate ids
if self.field != ID_KEY:
self.query_handler.content_hash[key_idx] = cached_field
def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):
"""
:param keys: list of Document.id
"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.ids, self.save_abspath)
for key in keys:
key_idx = self.query_handler.ids.index(key)
self.query_handler.ids = [query_id for idx, query_id in enumerate(self.query_handler.ids) if idx != key_idx]
if self.field != ID_KEY:
self.query_handler.content_hash = [cached_field for idx, cached_field in
enumerate(self.query_handler.content_hash) if idx != key_idx]
self._size -= 1
def get_add_handler(self):
# not needed, as we use the queryhandler
# FIXME better way to silence warnings
return 1
def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)
def get_create_handler(self):
# not needed, as we use the queryhandler
# FIXME better way to silence warnings
return 1