Skip to content

Commit

Permalink
refactor: doc.id can now be arbitrary string (#1837)
Browse files Browse the repository at this point in the history
* refactor: doc.id can now be an arbitrary string

* feat(client): flow & client now accepts single doc

* feat(types): add .json() and .dict() to score, doc, querylang
  • Loading branch information
hanxiao committed Feb 3, 2021
1 parent b295c56 commit 39116ee
Show file tree
Hide file tree
Showing 26 changed files with 172 additions and 70 deletions.
9 changes: 6 additions & 3 deletions jina/clients/request/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
__copyright__ = "Copyright (c) 2020 Jina AI Limited. All rights reserved."
__license__ = "Apache-2.0"

from typing import Iterator, Union, Tuple, AsyncIterator
from typing import Iterator, Union, Tuple, AsyncIterator, Iterable

from .helper import _new_request_from_batch
from ... import Request
from ...enums import RequestType, DataInputType
from ...helper import batch_iterator
from ...logging import default_logger
from ...types.document import DocumentSourceType, DocumentContentType
from ...types.document import DocumentSourceType, DocumentContentType, Document
from ...types.sets.querylang import AcceptQueryLangType

SingletonDataType = Union[DocumentContentType,
DocumentSourceType,
Document,
Tuple[DocumentContentType, DocumentContentType],
Tuple[DocumentSourceType, DocumentSourceType]]

GeneratorSourceType = Union[Iterator[SingletonDataType], AsyncIterator[SingletonDataType]]
GeneratorSourceType = Union[Document, Iterator[SingletonDataType], AsyncIterator[SingletonDataType]]


def request_generator(data: GeneratorSourceType,
Expand All @@ -36,6 +37,8 @@ def request_generator(data: GeneratorSourceType,
_kwargs = dict(mime_type=mime_type, length=request_size, weight=1.0)

try:
if not isinstance(data, Iterable):
data = [data]
for batch in batch_iterator(data, request_size):
yield _new_request_from_batch(_kwargs, batch, data_type, mode, queryset)

Expand Down
19 changes: 19 additions & 0 deletions jina/executors/indexers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ def __init__(self,
super().__init__(*args, **kwargs)
self.index_filename = index_filename #: the file name of the stored index, no path is required
self._size = 0
self._key_length = 0 #: the length of the key

@property
def key_length(self) -> int:
return self._key_length

@key_length.setter
def key_length(self, val: int):
"""Set the max key length. """
if not self._key_length:
self._key_length = val
elif val < self._key_length:
# just padding, no big deal
self.logger.warning(
f'key padding is triggered. this indexer allows only keys at length {self._key_length}, '
f'but your max key length is {val}.')
elif val > self._key_length:
# panic
raise ValueError(f'this indexer allows only keys at length {self._key_length}, but yours is {val}')

def add(self, *args, **kwargs):
raise NotImplementedError
Expand Down
22 changes: 15 additions & 7 deletions jina/executors/indexers/keyvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ def flush(self):
class ReadHandler:
def __init__(self, path, key_length):
with open(path + '.head', 'rb') as fp:
tmp = np.frombuffer(fp.read(), dtype=[('', (np.str_, key_length)), ('', np.int64), ('', np.int64), ('', np.int64)])
self.header = {r[0]: None if np.array_equal((r[1], r[2], r[3]), HEADER_NONE_ENTRY) else (r[1], r[2], r[3]) for r in tmp}
tmp = np.frombuffer(fp.read(),
dtype=[('', (np.str_, key_length)), ('', np.int64), ('', np.int64), ('', np.int64)])
self.header = {
r[0]: None if np.array_equal((r[1], r[2], r[3]), HEADER_NONE_ENTRY) else (r[1], r[2], r[3]) for r in
tmp}
self._body = open(path, 'r+b')
self.body = self._body.fileno()

Expand All @@ -48,16 +51,20 @@ def get_create_handler(self):
def get_query_handler(self):
return self.ReadHandler(self.index_abspath, self._key_length)

def __init__(self, key_length: int = 16, *args, **kwargs):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_byte_len = 0
self._start = 0
self._page_size = mmap.ALLOCATIONGRANULARITY
self._key_length = key_length
self._key_length = 0

def add(self, keys: Iterator[str], values: Iterator[bytes], *args, **kwargs):
if len(list(keys)) != len(list(values)):
raise ValueError(f'Len of keys {len(keys)} did not match len of values {len(values)}')
if not keys:
return

max_key_len = max([len(k) for k in keys])
self.key_length = max_key_len

for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
Expand All @@ -81,7 +88,8 @@ def query(self, key: str) -> Optional[bytes]:
return m[r:]

def update(self, keys: Iterator[str], values: Iterator[bytes], *args, **kwargs):
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys(), self.save_abspath)
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys(),
self.save_abspath)
self._delete(keys)
self.add(keys, values)
return
Expand Down
17 changes: 9 additions & 8 deletions jina/executors/indexers/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class BaseNumpyIndexer(BaseVectorIndexer):
def __init__(self,
compress_level: int = 1,
ref_indexer: Optional['BaseNumpyIndexer'] = None,
key_length: int = 16,
*args, **kwargs):
"""
:param compress_level: The compresslevel argument is an integer from 0 to 9 controlling the
Expand All @@ -49,7 +48,6 @@ def __init__(self,
self.dtype = None
self.compress_level = compress_level
self.key_bytes = b''
self.key_dtype = (np.str_, key_length)
self.valid_indices = np.array([], dtype=bool)
self.ref_indexer_workspace_name = None

Expand All @@ -59,7 +57,7 @@ def __init__(self,
self.dtype = ref_indexer.dtype
self.compress_level = ref_indexer.compress_level
self.key_bytes = ref_indexer.key_bytes
self.key_dtype = ref_indexer.key_dtype
self._key_length = ref_indexer._key_length
self._size = ref_indexer._size
# point to the ref_indexer.index_filename
# so that later in `post_init()` it will load from the referred index_filename
Expand Down Expand Up @@ -123,10 +121,13 @@ def _validate_key_vector_shapes(self, keys, vectors):
raise ValueError(f'number of key {keys.shape[0]} not equal to number of vectors {vectors.shape[0]}')

def add(self, keys: Iterator[str], vectors: 'np.ndarray', *args, **kwargs) -> None:
np_keys = np.array(keys, dtype=self.key_dtype)
max_key_len = max([len(k) for k in keys])
self.key_length = max_key_len
np_keys = np.array(keys, (np.str_, self.key_length))

self._add(np_keys, vectors)

def _add(self, keys: 'np.ndarry', vectors: 'np.ndarray'):
def _add(self, keys: 'np.ndarray', vectors: 'np.ndarray'):
self._validate_key_vector_shapes(keys, vectors)
self.write_handler.write(vectors.tobytes())
self.valid_indices = np.concatenate((self.valid_indices, np.full(len(keys), True)))
Expand All @@ -136,7 +137,7 @@ def _add(self, keys: 'np.ndarry', vectors: 'np.ndarray'):
def update(self, keys: Iterator[str], values: Sequence[bytes], *args, **kwargs) -> None:
# noinspection PyTypeChecker
keys, values = self._filter_nonexistent_keys_values(keys, values, self._ext2int_id.keys(), self.save_abspath)
np_keys = np.array(keys, dtype=self.key_dtype)
np_keys = np.array(keys, (np.str_, self.key_length))

if np_keys.size:
self._delete(np_keys)
Expand All @@ -151,7 +152,7 @@ def _delete(self, keys):

def delete(self, keys: Iterator[str], *args, **kwargs) -> None:
keys = self._filter_nonexistent_keys(keys, self._ext2int_id.keys(), self.save_abspath)
np_keys = np.array(keys, dtype=self.key_dtype)
np_keys = np.array(keys, (np.str_, self.key_length))
self._delete(np_keys)

def get_query_handler(self) -> Optional['np.ndarray']:
Expand Down Expand Up @@ -216,7 +217,7 @@ def query_by_key(self, keys: Sequence[str], *args, **kwargs) -> Optional['np.nda
def _int2ext_id(self) -> Optional['np.ndarray']:
"""Convert internal ids (0,1,2,3,4,...) to external ids (random index) """
if self.key_bytes:
r = np.frombuffer(self.key_bytes, dtype=self.key_dtype)
r = np.frombuffer(self.key_bytes, dtype=(np.str_, self.key_length))
# `==` is required. `is False` does not work in np
deleted_keys = len(self.valid_indices[self.valid_indices == False]) # noqa
if r.shape[0] == (self.size + deleted_keys) == self._raw_ndarray.shape[0]:
Expand Down
19 changes: 15 additions & 4 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,23 @@ def _get_port(port=0):
return _port


def random_identity() -> str:
return str(random_uuid())
def random_identity(use_uuid1: bool = False) -> str:
"""Generate random UUID
:param use_uuid1: use UUID1 instead of UUID4. This is the default Document ID generator.
def random_uuid() -> uuid.UUID:
return uuid.uuid4()
..note::
A MAC address or time-based ordering (UUID1) can afford increased database performance, since it's less work
to sort numbers closer-together than those distributed randomly (UUID4) (see here).
A second related issue, is that using UUID1 can be useful in debugging, even if origin data is lost or not
explicitly stored.
"""
return str(random_uuid(use_uuid1))


def random_uuid(use_uuid1: bool = False) -> uuid.UUID:
return uuid.uuid1() if use_uuid1 else uuid.uuid4()


def expand_env_var(v: str) -> Optional[Union[bool, int, str, list, float]]:
Expand Down
10 changes: 5 additions & 5 deletions jina/peapods/peas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, args: 'argparse.Namespace'):
self.runtime = self._get_runtime_cls()(self.args) # type: 'BaseRuntime'
except Exception as ex:
self.logger.error(f'{ex!r} during {self.runtime_cls.__init__!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)
raise RuntimeFailToStart from ex
Expand All @@ -66,7 +66,7 @@ def run(self):
self.runtime.setup()
except Exception as ex:
self.logger.error(f'{ex!r} during {self.runtime.setup!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)
else:
Expand All @@ -79,15 +79,15 @@ def run(self):
self.logger.info(f'{self.runtime!r} is interrupted by user')
except (Exception, SystemError) as ex:
self.logger.error(f'{ex!r} during {self.runtime.run_forever!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)

try:
self.runtime.teardown()
except Exception as ex:
self.logger.error(f'{ex!r} during {self.runtime.teardown!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)
finally:
Expand Down Expand Up @@ -138,7 +138,7 @@ def close(self) -> None:
self.is_shutdown.wait()
except Exception as ex:
self.logger.error(f'{ex!r} during {self.runtime.cancel!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)

Expand Down
4 changes: 2 additions & 2 deletions jina/peapods/runtimes/zmq/zed.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ def _msg_callback(self, msg: 'Message') -> None:
if isinstance(ex, ChainedPodException):
msg.add_exception()
self.logger.error(f'{ex!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)
else:
msg.add_exception(ex, executor=getattr(self, '_executor'))
self.logger.error(f'{ex!r}' +
f'add "--hide-exc-info" if you do not want to see the exception stack in details'
f'\n add "--hide-exc-info" to suppress the exception details'
if not self.args.hide_exc_info else '',
exc_info=not self.args.hide_exc_info)

Expand Down
16 changes: 16 additions & 0 deletions jina/resources/executors._index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
!CompoundIndexer
components:
- !NumpyIndexer
with:
index_filename: vec.gz
metric: euclidean
metas:
name: vecidx # a customized name
- !BinaryPbIndexer
with:
index_filename: chunk.gz
metas:
name: docidx
metas:
name: simple_indexer
workspace: ./
38 changes: 28 additions & 10 deletions jina/types/document/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import urllib.parse
import urllib.request
import warnings
import numpy as np
from hashlib import blake2b
from typing import Union, Dict, Optional, TypeVar, Any, Callable, Sequence, Tuple

import numpy as np
from google.protobuf import json_format
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.json_format import MessageToJson, MessageToDict

from .converters import png_to_buffer, to_datauri, guess_mime
from .uid import DIGEST_SIZE, UniqueId
Expand All @@ -18,7 +19,7 @@
from ..sets.chunk import ChunkSet
from ..sets.match import MatchSet
from ...excepts import BadDocType
from ...helper import is_url, typename
from ...helper import is_url, typename, random_identity
from ...importer import ImportExtensions
from ...proto import jina_pb2

Expand Down Expand Up @@ -143,8 +144,7 @@ def __init__(self, document: Optional[DocumentSourceType] = None,
f'you may use "Document(content=your_content)"') from ex

if self._document.id is None or not self._document.id:
import random
self.id = random.randint(0, np.iinfo(np.int64).max)
self.id = random_identity(use_uuid1=True)

self.set_attrs(**kwargs)

Expand Down Expand Up @@ -217,18 +217,18 @@ def update_content_hash(self,
self._document.content_hash = blake2b(masked_d.SerializeToString(), digest_size=DIGEST_SIZE).hexdigest()

@property
def id(self) -> 'UniqueId':
def id(self) -> str:
"""The document id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable.
it will be used as the major view.
"""
return UniqueId(self._document.id)
return self._document.id

@property
def parent_id(self) -> 'UniqueId':
def parent_id(self) -> str:
"""The document's parent id in hex string, for non-binary environment such as HTTP, CLI, HTML and also human-readable.
it will be used as the major view.
"""
return UniqueId(self._document.parent_id)
return self._document.parent_id

@id.setter
def id(self, value: Union[bytes, str, int]):
Expand All @@ -244,7 +244,12 @@ def id(self, value: Union[bytes, str, int]):
:param value: restricted string value
:return:
"""
self._document.id = UniqueId(value)
if isinstance(value, str):
self._document.id = value
else:
warnings.warn(f'expecting a string as ID, receiving {type(value)}. '
f'Note this type will be deprecated soon', DeprecationWarning)
self._document.id = UniqueId(value)

@parent_id.setter
def parent_id(self, value: Union[bytes, str, int]):
Expand All @@ -260,7 +265,12 @@ def parent_id(self, value: Union[bytes, str, int]):
:param value: restricted string value
:return:
"""
self._document.parent_id = UniqueId(value)
if isinstance(value, str):
self._document.parent_id = value
else:
warnings.warn(f'expecting a string as ID, receiving {type(value)}. '
f'Note this type will be deprecated soon', DeprecationWarning)
self._document.parent_id = UniqueId(value)

@property
def blob(self) -> 'np.ndarray':
Expand Down Expand Up @@ -613,3 +623,11 @@ def _traverse_rec(self, docs: Sequence['Document'], parent_doc: Optional['Docume
else:
for d in docs:
callback_fn(d, parent_doc, parent_edge_type, *args, **kwargs)

def json(self) -> str:
"""Return the Document object in JSON string """
return MessageToJson(self._document)

def dict(self) -> Dict:
"""Return the Document object in dictionary """
return MessageToDict(self._document)

0 comments on commit 39116ee

Please sign in to comment.