Skip to content

Commit

Permalink
fix: kv indexer (#1628)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Jan 8, 2021
1 parent 36bb513 commit 4b67bce
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 80 deletions.
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,3 @@ shell/jina-wizard.sh
# IntelliJ IDEA
*.iml
.idea

# tests binaries
/tests/**/*.bin
*.bin
12 changes: 12 additions & 0 deletions jina/executors/indexers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ def flush(self):
except:
pass

def _filter_nonexistent_keys(self, keys: Iterator, existent_keys: Iterator, check_path: str):
indices_to_drop = []
keys = list(keys)
for key_index, key in enumerate(keys):
if key not in existent_keys:
indices_to_drop.append(key_index)
if indices_to_drop:
self.logger.warning(
f'Key(s) {[keys[i] for i in indices_to_drop]} were not found in {check_path}. Continuing anyway...')
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys


class BaseVectorIndexer(BaseIndexer):
"""An abstract class for vector indexer. It is equipped with drivers in ``requests.on``
Expand Down
9 changes: 2 additions & 7 deletions jina/executors/indexers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional, Iterator

from jina.executors.indexers import BaseKVIndexer
from jina.helper import check_keys_exist

DATA_FIELD = 'data'
ID_KEY = 'id'
Expand Down Expand Up @@ -99,9 +98,7 @@ def update(self, keys: Iterator['UniqueId'], values: Iterator[any], *args, **kwa
"""
:param keys: list of Document.id
:param values: list of either `id` or `content_hash` of :class:`Document"""
missed = check_keys_exist(keys, self.query_handler.ids)
if missed:
raise KeyError(f'Keys {missed} were not found in {self.index_abspath}. No operation performed...')
keys = self._filter_nonexistent_keys(keys, self.query_handler.ids, self.save_abspath)

for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
Expand All @@ -113,9 +110,7 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):
"""
:param keys: list of Document.id
"""
missed = check_keys_exist(keys, self.query_handler.ids)
if missed:
raise KeyError(f'Keys {missed} were not found in {self.index_abspath}. No operation performed...')
keys = self._filter_nonexistent_keys(keys, self.query_handler.ids, self.save_abspath)

for key in keys:
key_idx = self.query_handler.ids.index(key)
Expand Down
24 changes: 10 additions & 14 deletions jina/executors/indexers/keyvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,14 @@ def query(self, key: int) -> Optional[bytes]:
return m[r:]

def update(self, keys: Iterator[int], values: Iterator[bytes], *args, **kwargs):
# check: hard fail (raises) on key not found
missed = []
for key in keys:
if self.query_handler.header.get(key) is None:
missed.append(key)
if missed:
raise KeyError(f'Key(s) {missed} were not found in {self.save_abspath}')

# hack
self.query_handler.close()
self.handler_mutex = False
self.delete(keys)
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys(), self.save_abspath)
self._delete(keys)
self.add(keys, values)
return

def delete(self, keys: Iterator[int], *args, **kwargs):
def _delete(self, keys: Iterator[int]):
self.query_handler.close()
self.handler_mutex = False
for key in keys:
self.write_handler.header.write(
np.array(
Expand All @@ -102,9 +94,13 @@ def delete(self, keys: Iterator[int], *args, **kwargs):
).tobytes()
)
if self.query_handler:
self.query_handler.header[key] = None
del self.query_handler.header[key]
self._size -= 1

def delete(self, keys: Iterator[int], *args, **kwargs):
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys(), self.save_abspath)
self._delete(keys)


class DataURIPbIndexer(BinaryPbIndexer):
"""Shortcut for :class:`DocPbIndexer` equipped with ``requests.on`` for storing doc-level protobuf and data uri info,
Expand Down
48 changes: 27 additions & 21 deletions jina/executors/indexers/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,29 @@ def add(self, keys: 'np.ndarray', vectors: 'np.ndarray', *args, **kwargs) -> Non
self.key_dtype = keys.dtype.name
self._size += keys.shape[0]

def _check_keys(self, keys: Sequence[int]) -> None:
missed = []
for key in keys:
# if it never existed or if it's been marked as deleted in the current index
# using `is False` doesn't work
if key not in self.ext2int_id.keys() or self.valid_indices[self.ext2int_id[key]] == False: # noqa
missed.append(key)
if missed:
raise KeyError(f'Key(s) {missed} were not found in {self.save_abspath}')

def update(self, keys: Sequence[int], values: Sequence[bytes], *args, **kwargs) -> None:
self.delete(keys)
self.add(np.array(keys), np.array(values))
keys = self._filter_nonexistent_keys(keys, self.ext2int_id.keys(), self.save_abspath)
# could be empty
if keys:
# expects np array for computing shapes
keys = np.array(list(keys))
self._delete(keys)
self.add(np.array(keys), np.array(values))

def _delete(self, keys):
# could be empty
if keys:
# expects np array for computing shapes
keys = np.array(list(keys))
for key in keys:
# mark as `False` in mask
self.valid_indices[self.ext2int_id[key]] = False
self._size -= 1

def delete(self, keys: Sequence[int], *args, **kwargs) -> None:
self._check_keys(keys)
for key in keys:
# mark as `False` in mask
self.valid_indices[self.ext2int_id[key]] = False
self._size -= 1
if kwargs.get('keys_precomputed') is not True:
keys = self._filter_nonexistent_keys(keys, self.ext2int_id.keys(), self.save_abspath)
self._delete(keys)

def get_query_handler(self) -> Optional['np.ndarray']:
"""Open a gzip file and load it as a numpy ndarray
Expand Down Expand Up @@ -192,15 +195,18 @@ def raw_ndarray(self) -> Optional['np.ndarray']:
return np.memmap(self.index_abspath, dtype=self.dtype, mode='r',
shape=(self.size + deleted_keys, self.num_dim))

def query_by_id(self, ids: Union[List[int], 'np.ndarray'], *args, **kwargs) -> 'np.ndarray':
def query_by_id(self, ids: Union[List[int], 'np.ndarray'], *args, **kwargs) -> Optional['np.ndarray']:
"""
Search the index by the external key (passed during `.add(`)
:param ids: The list of keys to be queried
"""
self._check_keys(ids)
indices = [self.ext2int_id[key] for key in ids]
return self.raw_ndarray[indices]
ids = self._filter_nonexistent_keys(ids, self.ext2int_id.keys(), self.save_abspath)
if ids:
indices = [self.ext2int_id[key] for key in ids]
return self.raw_ndarray[indices]
else:
return None

@cached_property
def int2ext_id(self) -> Optional['np.ndarray']:
Expand Down
9 changes: 1 addition & 8 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ def run(self):
from .excepts import BadClient
raise BadClient('something wrong when running the eventloop, result can not be retrieved')
else:

raise RuntimeError('you have an eventloop running but not using Jupyter/ipython, '
'this may mean you are using Jina with other integration? if so, then you '
'may want to use AsyncClient/AsyncFlow instead of Client/Flow. If not, then '
Expand All @@ -693,14 +694,6 @@ def run(self):
return asyncio.run(func(*args, **kwargs))


def check_keys_exist(keys_to_check, existing_keys):
missed = []
for k in keys_to_check:
if k not in existing_keys:
missed.append(k)
return missed


def slugify(value):
"""
Normalizes string, converts to lowercase, removes non-alpha characters,
Expand Down
25 changes: 13 additions & 12 deletions tests/unit/drivers/test_cache_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ def docs(self):
return DocumentSet(list(random_docs(10)))


def test_cache_driver_twice(tmpdir):
def test_cache_driver_twice(tmpdir, test_metas):
docs = DocumentSet(list(random_docs(10)))
driver = MockCacheDriver()
# FIXME DocIdCache doesn't use tmpdir, it saves in curdir
with DocIDCache(tmpdir) as executor:
with DocIDCache(tmpdir, metas=test_metas) as executor:
assert not executor.handler_mutex
driver.attach(executor=executor, runtime=None)
driver._traverse_apply(docs)
Expand All @@ -49,10 +49,10 @@ def test_cache_driver_twice(tmpdir):
assert os.path.exists(filename)


def test_cache_driver_tmpfile():
def test_cache_driver_tmpfile(tmpdir, test_metas):
docs = list(random_docs(10, embedding=False))
driver = MockCacheDriver()
with DocIDCache(field=ID_KEY) as executor:
with DocIDCache(tmpdir, field=ID_KEY, metas=test_metas) as executor:
assert not executor.handler_mutex
driver.attach(executor=executor, runtime=None)

Expand All @@ -69,14 +69,15 @@ def test_cache_driver_tmpfile():
assert os.path.exists(executor.index_abspath)


def test_cache_driver_from_file(tmp_path):
def test_cache_driver_from_file(tmpdir, test_metas):
filename = 'test-tmp.bin'
docs = list(random_docs(10, embedding=False))
pickle.dump([doc.id for doc in docs], open(f'{filename}.ids', 'wb'))
pickle.dump([doc.content_hash for doc in docs], open(f'{filename}.cache', 'wb'))
pickle.dump([doc.id for doc in docs], open(f'{os.path.join(test_metas["workspace"], filename)}.ids', 'wb'))
pickle.dump([doc.content_hash for doc in docs],
open(f'{os.path.join(test_metas["workspace"], filename)}.cache', 'wb'))

driver = MockCacheDriver()
with DocIDCache(filename, field=CONTENT_HASH_KEY) as executor:
with DocIDCache(filename, metas=test_metas, field=CONTENT_HASH_KEY) as executor:
assert not executor.handler_mutex
driver.attach(executor=executor, runtime=None)

Expand All @@ -102,7 +103,7 @@ def on_hit(self, req_doc: 'jina_pb2.DocumentProto', hit_result: Any) -> None:
raise NotImplementedError


def test_cache_content_driver_same_content(tmpdir):
def test_cache_content_driver_same_content(tmpdir, test_metas):
doc1 = Document(id=1)
doc1.text = 'blabla'
doc1.update_content_hash()
Expand All @@ -117,7 +118,7 @@ def test_cache_content_driver_same_content(tmpdir):
driver = MockBaseCacheDriver()
filename = None

with DocIDCache(tmpdir, field=CONTENT_HASH_KEY) as executor:
with DocIDCache(tmpdir, metas=test_metas, field=CONTENT_HASH_KEY) as executor:
driver.attach(executor=executor, runtime=None)
driver._traverse_apply(docs1)

Expand Down Expand Up @@ -150,7 +151,7 @@ def test_cache_content_driver_same_content(tmpdir):
assert executor.query(doc1.content_hash) is None


def test_cache_content_driver_same_id(tmp_path):
def test_cache_content_driver_same_id(tmp_path, test_metas):
filename = tmp_path / 'docidcache.bin'
doc1 = Document(id=1)
doc1.text = 'blabla'
Expand All @@ -164,7 +165,7 @@ def test_cache_content_driver_same_id(tmp_path):

driver = MockBaseCacheDriver()

with DocIDCache(filename, field=CONTENT_HASH_KEY) as executor:
with DocIDCache(filename, metas=test_metas, field=CONTENT_HASH_KEY) as executor:
driver.attach(executor=executor, runtime=None)
driver._traverse_apply(docs1)
driver._traverse_apply(docs2)
Expand Down
20 changes: 12 additions & 8 deletions tests/unit/executors/indexers/test_binary_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ def test_binarypb_update1(test_metas):
with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query(1) == b'oldvalue'

with BaseIndexer.load(save_abspath) as idxer:
# no update triggered AT ALL when encountering missing key
# atomic op. at indexer level
with pytest.raises(KeyError):
idxer.update([1, 2, 99], [b'newvalue', b'same', b'decoy'])

idxer.save()

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query(1) == b'oldvalue'
second_size = os.fstat(idxer.query_handler._body.fileno()).st_size
Expand All @@ -76,6 +68,18 @@ def test_binarypb_update1(test_metas):
assert idxer.query(3) == b'random'
assert idxer.query(99) is None

with BaseIndexer.load(save_abspath) as idxer:
# partial update when missing keys encountered
idxer.update([1, 2, 99], [b'newvalue2', b'newvalue3', b'decoy'])
idxer.save()
assert idxer.size == 3

with BaseIndexer.load(save_abspath) as idxer:
assert idxer.query(1) == b'newvalue2'
assert idxer.query(2) == b'newvalue3'
assert idxer.query(3) == b'random'
assert idxer.query(99) is None


def test_binarypb_add_and_update_not_working(test_metas):
with BinaryPbIndexer(metas=test_metas) as idxer:
Expand Down
15 changes: 9 additions & 6 deletions tests/unit/executors/indexers/test_numpyindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,8 @@ def test_numpy_update_delete(compress_level, test_metas):

with BaseIndexer.load(save_abspath) as indexer:
assert isinstance(indexer, NumpyIndexer)
# this will fail cause the key doesn't exist
with pytest.raises(KeyError):
indexer.update(random_keys, random_data)
# NON-EXISTENT KEYS: this will log warning but not fail
indexer.update(random_keys, random_data)
indexer.update([key_to_update], data_to_update)
indexer.save()

Expand All @@ -296,9 +295,8 @@ def test_numpy_update_delete(compress_level, test_metas):
with BaseIndexer.load(save_abspath) as indexer:
assert isinstance(indexer, NumpyIndexer)
assert indexer.size == len(vec_idx) - keys_to_delete
# this will fail. key doesn't exist
with pytest.raises(KeyError):
_ = indexer.query_by_id(vec_idx)
# random non-existent key
assert indexer.query_by_id([123861942]) is None
query_results = indexer.query_by_id(vec_idx[keys_to_delete:])
expected = vec[keys_to_delete:]
np.testing.assert_allclose(query_results, expected, equal_nan=True)
Expand Down Expand Up @@ -362,6 +360,11 @@ def test_numpy_indexer_known_and_delete(batch_size, compress_level, test_metas):
assert idx.shape == (len(queries), top_k)
np.testing.assert_equal(indexer.query_by_id([6, 5]), vectors[[2, 1]])

# test query by nonexistent key
with BaseIndexer.load(save_abspath) as indexer:
assert isinstance(indexer, NumpyIndexer)
assert indexer.query_by_id([91237124]) is None


@pytest.mark.parametrize('compress_level', [0, 1, 2, 3])
def test_numpy_indexer_with_ref_indexer(compress_level, test_metas):
Expand Down

0 comments on commit 4b67bce

Please sign in to comment.