-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
keyvalue.py
136 lines (107 loc) · 4.77 KB
/
keyvalue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
__copyright__ = "Copyright (c) 2020 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"
import mmap
from typing import Iterable, Optional
import numpy as np
from . import BaseKVIndexer
from ..compound import CompoundExecutor
HEADER_NONE_ENTRY = (-1, -1, -1)
class BinaryPbIndexer(BaseKVIndexer):
class WriteHandler:
def __init__(self, path, mode):
self.body = open(path, mode)
self.header = open(path + '.head', mode)
def close(self):
self.body.close()
self.header.close()
def flush(self):
self.body.flush()
self.header.flush()
class ReadHandler:
def __init__(self, path, key_length):
with open(path + '.head', 'rb') as fp:
tmp = np.frombuffer(fp.read(),
dtype=[('', (np.str_, key_length)), ('', np.int64), ('', np.int64), ('', np.int64)])
self.header = {
r[0]: None if np.array_equal((r[1], r[2], r[3]), HEADER_NONE_ENTRY) else (r[1], r[2], r[3]) for r in
tmp}
self._body = open(path, 'r+b')
self.body = self._body.fileno()
def close(self):
self._body.close()
def get_add_handler(self):
# keep _start position as in pickle serialization
return self.WriteHandler(self.index_abspath, 'ab')
def get_create_handler(self):
self._start = 0 # override _start position
return self.WriteHandler(self.index_abspath, 'wb')
def get_query_handler(self):
return self.ReadHandler(self.index_abspath, self._key_length)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_byte_len = 0
self._start = 0
self._page_size = mmap.ALLOCATIONGRANULARITY
def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs):
"""Add the serialized documents to the index via document ids.
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param values: serialized documents
"""
if not keys:
return
self._assert_key_length(keys)
for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
r = self._start % self._page_size #: the remainder, i.e. the start position given the offset
self.write_handler.header.write(
np.array(
(key, p, r, r + l),
dtype=[('', (np.str_, self._key_length)), ('', np.int64), ('', np.int64), ('', np.int64)]
).tobytes()
)
self._start += l
self.write_handler.body.write(value)
self._size += 1
self.write_handler.flush()
def query(self, key: str) -> Optional[bytes]:
"""Find the serialized document to the index via document id.
:param key: document id
:return: serialized documents
"""
pos_info = self.query_handler.header.get(key, None)
if pos_info is not None:
p, r, l = pos_info
with mmap.mmap(self.query_handler.body, offset=p, length=l) as m:
return m[r:]
def update(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs):
"""Update the serialized documents on the index via document ids.
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param values: serialized documents
"""
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys())
self._delete(keys)
self.add(keys, values)
def _delete(self, keys: Iterable[str]):
self.query_handler.close()
self.handler_mutex = False
for key in keys:
self.write_handler.header.write(
np.array(
tuple(np.concatenate([[key], HEADER_NONE_ENTRY])),
dtype=[('', (np.str_, self._key_length)), ('', np.int64), ('', np.int64), ('', np.int64)]
).tobytes()
)
if self.query_handler:
del self.query_handler.header[key]
self._size -= 1
def delete(self, keys: Iterable[str], *args, **kwargs):
"""Delete the serialized documents from the index via document ids.
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys())
self._delete(keys)
class DataURIPbIndexer(BinaryPbIndexer):
"""Alias for BinaryPbIndexer"""
class UniquePbIndexer(CompoundExecutor):
"""A frequently used pattern for combining a :class:`BaseKVIndexer` and a :class:`DocCache` """