Skip to content

Commit

Permalink
refactor: remove unique id (#1872)
Browse files Browse the repository at this point in the history
* refactor: remove unique id

* refactor: force parent id to be string

* tests: fix cache driver

* test: fix cache driver

* test: fix workspace

* test: flow multimode

* refactor: indexer

* fix: better default handling of key length

Co-authored-by: Maximilian Werk <maximilian.werk@jina.ai>
  • Loading branch information
florian-hoenicke and maximilianwerk committed Feb 5, 2021
1 parent e4e48b7 commit 4e1db76
Show file tree
Hide file tree
Showing 19 changed files with 58 additions and 195 deletions.
3 changes: 2 additions & 1 deletion jina/clients/request/helper.py
Expand Up @@ -67,4 +67,5 @@ def _add_docs_groundtruths(req, batch, data_type, _kwargs):


def _add_ids(req, batch):
req.ids.extend(batch)
string_ids = (str(doc_id) for doc_id in batch)
req.ids.extend(string_ids)
4 changes: 1 addition & 3 deletions jina/drivers/rank/__init__.py
Expand Up @@ -4,7 +4,6 @@

from .. import BaseExecutableDriver
from ...types.document import Document
from ...types.document.uid import UniqueId
from ...types.score import NamedScore


Expand Down Expand Up @@ -65,8 +64,7 @@ def _sort_matches_in_place(self, context_doc: 'Document', match_scores: 'np.ndar
op_name = self.exec.__class__.__name__
cm = context_doc.matches
cm.build()
for str_match_id, score in match_scores:
match_id = UniqueId(str_match_id)
for match_id, score in match_scores:
cm[match_id].score = NamedScore(value=score, op_name=op_name, ref_id=context_doc.id)

cm.sort(key=lambda x: x.score.value, reverse=True)
7 changes: 3 additions & 4 deletions jina/drivers/rank/aggregate/__init__.py
Expand Up @@ -10,7 +10,6 @@
from .. import BaseRankDriver

if False:
from ....types.document.uid import UniqueId
from ....types.sets import DocumentSet


Expand Down Expand Up @@ -105,9 +104,9 @@ def _apply_all(self, docs: 'DocumentSet',
:return:
"""

match_idx = [] # type: List[Tuple[UniqueId, UniqueId, UniqueId, float]]
query_meta = {} # type: Dict[UniqueId, Dict]
match_meta = {} # type: Dict[UniqueId, Dict]
match_idx = [] # type: List[Tuple[str, str, str, float]]
query_meta = {} # type: Dict[str, Dict]
match_meta = {} # type: Dict[str, Dict]
parent_id_chunk_id_map = defaultdict(list)
matches_by_id = defaultdict(Document)
for chunk in docs:
Expand Down
24 changes: 11 additions & 13 deletions jina/executors/indexers/__init__.py
Expand Up @@ -41,6 +41,7 @@ class BaseIndexer(BaseExecutor):

def __init__(self,
index_filename: str = None,
key_length: int = None,
*args, **kwargs):
"""
Expand All @@ -51,26 +52,23 @@ def __init__(self,
super().__init__(*args, **kwargs)
self.index_filename = index_filename #: the file name of the stored index, no path is required
self._size = 0
self._key_length = 16 #: the default minimum length of the key, will be expanded one time on the first batch
self._key_length = key_length #: the default minimum length of the key, will be expanded one time on the first batch

@property
def key_length(self) -> int:
return self._key_length

def _assert_key_length(self, keys):
max_key_len = max([len(k) for k in keys])

if self.key_length is None:
self.key_length = max(16, max_key_len)
elif max_key_len > self.key_length:
raise ValueError(f'This indexer allows only keys of length {self._key_length}, but yours is {max_key_len}.')

@key_length.setter
def key_length(self, val: int):
"""Set the max key length. """
if not self._key_length or self._key_length < val:
# expand once
self._key_length = val
elif val < self._key_length:
# just padding, no big deal
self.logger.warning(
f'key padding is triggered. this indexer allows only keys at length {self._key_length}, '
f'but your max key length is {val}.')
elif val > self._key_length:
# panic
raise ValueError(f'this indexer allows only keys at length {self._key_length}, but yours is {val}')
self._key_length = val

def add(self, *args, **kwargs):
"""Add documents to the index.
Expand Down
5 changes: 1 addition & 4 deletions jina/executors/indexers/cache.py
Expand Up @@ -75,16 +75,14 @@ def add(self, doc_id: str, *args, **kwargs):
self.query_handler.cache_val_to_id[data] = doc_id
self._size += 1

def query(self, data, *args, **kwargs) -> Optional[bool]:
def query(self, data: str, *args, **kwargs) -> Optional[bool]:
"""Check whether the data exists in the cache.
:param data: either the id or the content_hash of a Document
:return: status
"""

return data in self.query_handler.cache_val_to_id


def update(self, keys: Iterable[str], values: Iterable[any], *args, **kwargs):
"""Update cached documents.
:param keys: list of Document.id
Expand All @@ -99,7 +97,6 @@ def update(self, keys: Iterable[str], values: Iterable[any], *args, **kwargs):
del self.query_handler.cache_val_to_id[old_value]
self.query_handler.cache_val_to_id[value] = key


def delete(self, keys: Iterable[str], *args, **kwargs):
"""Delete documents from the cache.
:param keys: list of Document.id
Expand Down
5 changes: 1 addition & 4 deletions jina/executors/indexers/keyvalue.py
Expand Up @@ -56,7 +56,6 @@ def __init__(self, *args, **kwargs):
self._total_byte_len = 0
self._start = 0
self._page_size = mmap.ALLOCATIONGRANULARITY
self._key_length = 0

def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs):
"""Add the serialized documents to the index via document ids.
Expand All @@ -66,9 +65,7 @@ def add(self, keys: Iterable[str], values: Iterable[bytes], *args, **kwargs):
"""
if not keys:
return

max_key_len = max([len(k) for k in keys])
self.key_length = max_key_len
self._assert_key_length(keys)

for key, value in zip(keys, values):
l = len(value) #: the length
Expand Down
4 changes: 2 additions & 2 deletions jina/executors/indexers/vector.py
Expand Up @@ -124,8 +124,8 @@ def add(self, keys: Iterable[str], vectors: 'np.ndarray', *args, **kwargs) -> No
:param keys: a list of ``id``, i.e. ``doc.id`` in protobuf
:param vectors: embeddings
"""
max_key_len = max([len(k) for k in keys])
self.key_length = max_key_len
self._assert_key_length(keys)

np_keys = np.array(keys, (np.str_, self.key_length))

self._add(np_keys, vectors)
Expand Down
40 changes: 9 additions & 31 deletions jina/types/document/__init__.py
Expand Up @@ -13,7 +13,6 @@
from google.protobuf.field_mask_pb2 import FieldMask

from .converters import png_to_buffer, to_datauri, guess_mime
from .uid import DIGEST_SIZE, UniqueId
from ..mixin import ProtoTypeMixin
from ..ndarray.generic import NdArray
from ..score import NamedScore
Expand All @@ -25,6 +24,7 @@
from ...proto import jina_pb2

__all__ = ['Document', 'DocumentContentType', 'DocumentSourceType']
DIGEST_SIZE = 8

DocumentContentType = TypeVar('DocumentContentType', bytes, str, np.ndarray)
DocumentSourceType = TypeVar('DocumentSourceType',
Expand Down Expand Up @@ -261,45 +261,23 @@ def parent_id(self) -> str:

@id.setter
def id(self, value: Union[bytes, str, int]):
"""Set document id to a string value
"""Set document id to a string value.
.. note:
Customized ``id`` is acceptable as long as
- it only contains the symbols "0"–"9" to represent values 0 to 9,
and "A"–"F" (or alternatively "a"–"f").
- it has 16 chars described above.
:param value: restricted string value
:param value: id as bytes, int or str
:return:
"""
if isinstance(value, str):
self._pb_body.id = value
else:
warnings.warn(f'expecting a string as ID, receiving {type(value)}. '
f'Note this type will be deprecated soon', DeprecationWarning)
self._pb_body.id = UniqueId(value)
self._pb_body.id = str(value)


@parent_id.setter
def parent_id(self, value: Union[bytes, str, int]):
"""Set document's parent id to a string value
"""Set document's parent id to a string value.
.. note:
Customized ``id`` is acceptable as long as
- it only contains the symbols "0"–"9" to represent values 0 to 9,
and "A"–"F" (or alternatively "a"–"f").
- it has 16 chars described above.
:param value: restricted string value
:param value: id as bytes, int or str
:return:
"""
if isinstance(value, str):
self._pb_body.parent_id = value
else:
warnings.warn(f'expecting a string as ID, receiving {type(value)}. '
f'Note this type will be deprecated soon', DeprecationWarning)
self._pb_body.parent_id = UniqueId(value)
self._pb_body.parent_id = str(value)


@property
def blob(self) -> 'np.ndarray':
Expand Down
95 changes: 0 additions & 95 deletions jina/types/document/uid.py

This file was deleted.

6 changes: 0 additions & 6 deletions jina/types/sets/document.py
Expand Up @@ -36,11 +36,8 @@ def insert(self, index: int, doc: 'Document') -> None:
self._docs_proto.insert(index, doc.proto)

def __setitem__(self, key, value: 'Document'):
from ..document.uid import UniqueId
if isinstance(key, int):
self._docs_proto[key].CopyFrom(value)
elif isinstance(key, UniqueId):
self._docs_map[str(key)].CopyFrom(value)
elif isinstance(key, str):
self._docs_map[key].CopyFrom(value)
else:
Expand All @@ -59,11 +56,8 @@ def __iter__(self):

def __getitem__(self, item):
from ..document import Document
from ..document.uid import UniqueId
if isinstance(item, int):
return Document(self._docs_proto[item])
elif isinstance(item, UniqueId):
return Document(self._docs_map[str(item)])
elif isinstance(item, str):
return Document(self._docs_map[item])
elif isinstance(item, slice):
Expand Down
17 changes: 12 additions & 5 deletions tests/integration/crud/simple/test_crud.py
Expand Up @@ -29,19 +29,26 @@ def config(tmpdir):
def random_docs(start, end, embed_dim=10, jitter=1, has_content=True):
for j in range(start, end):
d = Document()
d.id = str(f'{j}' * 16)
d.id = j
if has_content:
d.tags['id'] = j
d.text = ''.join(random.choice(string.ascii_lowercase) for _ in range(10)).encode('utf8')
d.embedding = np.random.random([embed_dim + np.random.randint(0, jitter)])
yield d


def get_ids_to_delete(start, end, as_string):
if as_string:
return (str(idx) for idx in range(start, end))
return range(start, end)


def validate_index_size(num_indexed_docs, compound=False):
from jina.executors.compound import CompoundExecutor

if compound:
path = Path(CompoundExecutor.get_component_workspace_from_compound_workspace(os.environ['JINA_TOPK_DIR'], 'chunk_indexer', 0))
path = Path(CompoundExecutor.get_component_workspace_from_compound_workspace(os.environ['JINA_TOPK_DIR'],
'chunk_indexer', 0))
else:
path = Path(os.environ['JINA_TOPK_DIR'])
bin_files = list(path.glob('*.bin'))
Expand Down Expand Up @@ -96,8 +103,8 @@ def validate_results(resp):
mock.assert_called_once()


@pytest.mark.parametrize('has_content', [True, False])
def test_delete_kv(config, mocker, has_content):
@pytest.mark.parametrize('as_string', [True, False])
def test_delete_kv(config, mocker, as_string):
flow_file = 'flow_kv.yml'

def validate_result_factory(num_matches):
Expand All @@ -117,7 +124,7 @@ def validate_results(resp):
mock.assert_called_once()

with Flow.load_config(flow_file) as index_flow:
index_flow.delete(input_fn=[d.id for d in random_docs(0, 3, has_content=has_content)])
index_flow.delete(input_fn=get_ids_to_delete(0, 3, as_string))
validate_index_size(7)

mock = mocker.Mock()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/docidcache/test_crud_cache.py
Expand Up @@ -140,7 +140,7 @@ def check_docs(chunk_content, chunks, same_content, docs, ids_used, index_start=


def check_indexers_size(chunks, nr_docs, field, tmp_path, same_content, shards, post_op):
cache_indexer_path = tmp_path / 'cache.bin'
cache_indexer_path = os.path.join(tmp_path, 'cache.bin')
with BaseIndexer.load(cache_indexer_path) as cache:
assert isinstance(cache, DocCache)
cache_full_size = cache.size
Expand Down

0 comments on commit 4e1db76

Please sign in to comment.