Skip to content

Commit

Permalink
refactor: index performance (#1873)
Browse files Browse the repository at this point in the history
* refactor: index performance

* refactor: fix indexer filter

* refactor: index performance
  • Loading branch information
florian-hoenicke committed Feb 5, 2021
1 parent 97c6b59 commit e4e48b7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 33 deletions.
35 changes: 8 additions & 27 deletions jina/executors/indexers/__init__.py
Expand Up @@ -2,7 +2,7 @@
__license__ = "Apache-2.0"

import os
from typing import Tuple, List, Optional, Any, Iterable
from typing import Tuple, Optional, Any, Iterable

import numpy as np

Expand Down Expand Up @@ -196,32 +196,13 @@ def flush(self):
except:
pass

def _filter_nonexistent_keys_values(self, keys: Iterable, values: Iterable, existent_keys: Iterable,
check_path: str) -> Tuple[List, List]:
keys = list(keys)
values = list(values)
if len(keys) != len(values):
raise ValueError(f'Keys of length {len(keys)} did not match values of length {len(values)}')
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
values = [values[i] for i in range(len(values)) if i not in indices_to_drop]
return keys, values

def _filter_nonexistent_keys(self, keys: Iterable, existent_keys: Iterable, check_path: str) -> List:
keys = list(keys)
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys

def _get_indices_to_drop(self, keys: List, existent_keys: Iterable, check_path: str):
indices_to_drop = []
for key_index, key in enumerate(keys):
if key not in existent_keys:
indices_to_drop.append(key_index)
if indices_to_drop:
self.logger.warning(
f'Key(s) {[keys[i] for i in indices_to_drop]} were not found in {check_path}. Continuing anyway...')
return indices_to_drop
def _filter_nonexistent_keys_values(self, keys: Iterable, values: Iterable, existent_keys: Iterable) -> Tuple[
Iterable, Iterable]:
filtered_list = [[key, value] for key, value in zip(keys, values) if key in existent_keys]
return [key_value[0] for key_value in filtered_list], [key_value[1] for key_value in filtered_list]

def _filter_nonexistent_keys(self, keys: Iterable, existent_keys: Iterable) -> Iterable:
return [key for key in keys if key in existent_keys]


class BaseVectorIndexer(BaseIndexer):
Expand Down
5 changes: 2 additions & 3 deletions jina/executors/indexers/keyvalue.py
Expand Up @@ -103,8 +103,7 @@ def update(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs):
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param values: serialized documents
"""
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys(),
self.save_abspath)
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys())
self._delete(keys)
self.add(keys, values)

Expand All @@ -128,7 +127,7 @@ def delete(self, keys: Iterable[str], *args, **kwargs):
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys(), self.save_abspath)
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys())
self._delete(keys)


Expand Down
6 changes: 3 additions & 3 deletions jina/executors/indexers/vector.py
Expand Up @@ -144,7 +144,7 @@ def update(self, keys: Iterable[str], values: Sequence[bytes], *args, **kwargs)
:param values: embeddings
"""
# noinspection PyTypeChecker
keys, values = self._filter_nonexistent_keys_values(keys, values, self._ext2int_id.keys(), self.save_abspath)
keys, values = self._filter_nonexistent_keys_values(keys, values, self._ext2int_id.keys())
np_keys = np.array(keys, (np.str_, self.key_length))

if np_keys.size:
Expand All @@ -163,7 +163,7 @@ def delete(self, keys: Iterable[str], *args, **kwargs) -> None:
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
"""
keys = self._filter_nonexistent_keys(keys, self._ext2int_id.keys(), self.save_abspath)
keys = self._filter_nonexistent_keys(keys, self._ext2int_id.keys())
np_keys = np.array(keys, (np.str_, self.key_length))
self._delete(np_keys)

Expand Down Expand Up @@ -218,7 +218,7 @@ def query_by_key(self, keys: Sequence[str], *args, **kwargs) -> Optional['np.nda
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:return: ndarray of vectors
"""
keys = self._filter_nonexistent_keys(keys, self._ext2int_id.keys(), self.save_abspath)
keys = self._filter_nonexistent_keys(keys, self._ext2int_id.keys())
if keys:
indices = [self._ext2int_id[key] for key in keys]
return self._raw_ndarray[indices]
Expand Down

0 comments on commit e4e48b7

Please sign in to comment.