Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crud intergration tests: docidcache-centric #1613

Merged
merged 13 commits into from
Jan 15, 2021
25 changes: 15 additions & 10 deletions jina/drivers/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict

from .index import BaseIndexDriver
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY, ID_KEY

if False:
from .. import Document
Expand All @@ -26,15 +26,20 @@ def __init__(self, with_serialization: bool = False, *args, **kwargs):
def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.field = self.exec.field

for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)
if self._method_name == 'delete':
self.exec_fn([d.id for d in docs])
elif self._method_name == 'update':
self.exec_fn([d.id for d in docs], [d.id if self.field == ID_KEY else d.content_hash for d in docs])
else:
for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)

def on_miss(self, doc: 'Document', data) -> None:
"""Function to call when doc is missing, the default behavior is add to cache when miss
Expand Down
25 changes: 12 additions & 13 deletions jina/drivers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,16 @@ def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.logger.warning(f'these bad docs can not be added: {bad_docs}')
idx, dist = self.exec_fn(embed_vecs, top_k=int(self.top_k))

if idx is None and dist is None:
return

op_name = self.exec.__class__.__name__
for doc, topks, scores in zip(doc_pts, idx, dist):

topk_embed = fill_fn(topks) if (self._fill_embedding and fill_fn) else [None] * len(topks)
for numpy_match_id, score, vec in zip(topks, scores, topk_embed):
m = Document(id=int(numpy_match_id))
m.score = NamedScore(op_name=op_name,
value=score)
r = doc.matches.append(m)
if vec is not None:
r.embedding = vec
# can be None if index is size 0
if idx is not None and dist is not None:
for doc, topks, scores in zip(doc_pts, idx, dist):

topk_embed = fill_fn(topks) if (self._fill_embedding and fill_fn) else [None] * len(topks)
for numpy_match_id, score, vec in zip(topks, scores, topk_embed):
m = Document(id=int(numpy_match_id))
m.score = NamedScore(op_name=op_name,
value=score)
r = doc.matches.append(m)
if vec is not None:
r.embedding = vec
20 changes: 16 additions & 4 deletions jina/executors/indexers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,29 @@ def flush(self):
except:
pass

def _filter_nonexistent_keys(self, keys: Iterator, existent_keys: Iterator, check_path: str):
indices_to_drop = []
def _filter_nonexistent_keys_values(self, keys: Iterator, values: Iterator, existent_keys: Iterator, check_path: str) -> Tuple[List, List]:
keys = list(keys)
values = list(values)
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
values = [values[i] for i in range(len(values)) if i not in indices_to_drop]
return keys, values

def _filter_nonexistent_keys(self, keys: Iterator, existent_keys: Iterator, check_path: str) -> List:
keys = list(keys)
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys

def _get_indices_to_drop(self, keys: List, existent_keys: Iterator, check_path: str):
indices_to_drop = []
for key_index, key in enumerate(keys):
if key not in existent_keys:
indices_to_drop.append(key_index)
if indices_to_drop:
self.logger.warning(
f'Key(s) {[keys[i] for i in indices_to_drop]} were not found in {check_path}. Continuing anyway...')
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys
return indices_to_drop


class BaseVectorIndexer(BaseIndexer):
Expand Down
22 changes: 13 additions & 9 deletions jina/executors/indexers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def __init__(self, index_filename: str = None, *args, **kwargs):
raise ValueError(f"Field '{self.field}' not in supported list of {self.supported_fields}")

def add(self, doc_id: 'UniqueId', *args, **kwargs):
self._size += 1
self.query_handler.ids.append(doc_id)

# optimization. don't duplicate ids
Expand All @@ -76,6 +75,7 @@ def add(self, doc_id: 'UniqueId', *args, **kwargs):
if data is None:
raise ValueError(f'Got None from CacheDriver')
self.query_handler.content_hash.append(data)
self._size += 1

def query(self, data, *args, **kwargs) -> Optional[bool]:
"""
Expand All @@ -98,13 +98,15 @@ def update(self, keys: Iterator['UniqueId'], values: Iterator[any], *args, **kwa
"""
:param keys: list of Document.id
:param values: list of either `id` or `content_hash` of :class:`Document"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.ids, self.save_abspath)
# if we don't cache anything else, no need
if self.field != ID_KEY:
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.ids, self.save_abspath)

for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
# optimization. don't duplicate ids
if self.field != ID_KEY:
self.query_handler.content_hash[key_idx] = cached_field
for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
# optimization. don't duplicate ids
if self.field != ID_KEY:
self.query_handler.content_hash[key_idx] = cached_field

def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):
"""
Expand All @@ -122,11 +124,13 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):

def get_add_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
cristianmtr marked this conversation as resolved.
Show resolved Hide resolved

def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)

def get_create_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
4 changes: 3 additions & 1 deletion jina/executors/indexers/keyvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(self, *args, **kwargs):
self._page_size = mmap.ALLOCATIONGRANULARITY

def add(self, keys: Iterator[int], values: Iterator[bytes], *args, **kwargs):
if len(keys) != len(values):
raise ValueError(f'Len of keys {len(keys)} did not match len of values {len(values)}')
for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
Expand All @@ -78,7 +80,7 @@ def query(self, key: int) -> Optional[bytes]:
return m[r:]

def update(self, keys: Iterator[int], values: Iterator[bytes], *args, **kwargs):
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys(), self.save_abspath)
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys(), self.save_abspath)
self._delete(keys)
self.add(keys, values)
return
Expand Down
3 changes: 2 additions & 1 deletion jina/executors/indexers/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def add(self, keys: 'np.ndarray', vectors: 'np.ndarray', *args, **kwargs) -> Non
self._size += keys.shape[0]

def update(self, keys: Sequence[int], values: Sequence[bytes], *args, **kwargs) -> None:
keys = self._filter_nonexistent_keys(keys, self.ext2int_id.keys(), self.save_abspath)
# noinspection PyTypeChecker
keys, values = self._filter_nonexistent_keys_values(keys, values, self.ext2int_id.keys(), self.save_abspath)
# could be empty
# please do not use "if keys:", it wont work on both sequence and ndarray
if getattr(keys, 'size', len(keys)):
Expand Down
Empty file added tests/integration/cache.py
Empty file.
Empty file.
17 changes: 17 additions & 0 deletions tests/integration/docidcache/_merge_matches_topk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
!BaseExecutor
with: {}
metas:
name: merge_matches_topk
requests:
on:
[SearchRequest, TrainRequest, IndexRequest, DeleteRequest, UpdateRequest]:
- !ReduceAllDriver
with:
traversal_paths: ['m']
- !SliceQL
with:
start: 0
end: $JINA_TOPK
traversal_paths: ['m']
ControlRequest:
- !ControlReqDriver {}
29 changes: 29 additions & 0 deletions tests/integration/docidcache/cache.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
!DocIDCache
with:
field: $JINA_CACHE_FIELD
index_filename: cache
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
requests:
on:
IndexRequest:
- !TaggingCacheDriver
with:
executor: cache
traversal_paths: [r, c]
tags:
is_indexed: true
UpdateRequest:
- !BaseCacheDriver
with:
method: update
executor: cache
traversal_paths: [r, c]
DeleteRequest:
- !BaseCacheDriver
with:
method: delete
executor: cache
traversal_paths: [r, c]

28 changes: 28 additions & 0 deletions tests/integration/docidcache/crud_cache_flow_index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
!Flow
pods:
cache:
uses: cache.yml
show_exc_info: true
vector:
uses: vector.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: cache
kv:
uses: kv.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: $JINA_KV_NEEDS
final:
needs: $JINA_MERGER_NEEDS
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
show_exc_info: true

19 changes: 19 additions & 0 deletions tests/integration/docidcache/crud_cache_flow_query.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
!Flow
pods:
vector:
uses: vector.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches_topk.yml'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
kv:
uses: kv.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches_topk.yml'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: $JINA_KV_NEEDS
40 changes: 40 additions & 0 deletions tests/integration/docidcache/kv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
!CompoundIndexer
components:
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_KV_IDX_NAME
metas:
name: inc_docindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
requests:
on:
UpdateRequest:
- !KVIndexDriver
with:
method: update
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
DeleteRequest:
- !KVIndexDriver
with:
method: delete
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
IndexRequest:
- !FilterQL
with:
lookups: {tags__is_indexed__neq: true}
- !KVIndexDriver
with:
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
SearchRequest:
- !KVSearchDriver
with:
is_merge: false
executor: $JINA_KV_IDX_NAME
top_k: $JINA_TOPK
traversal_paths: [m]