Skip to content

Commit

Permalink
crud intergration tests: docidcache-centric (#1613)
Browse files Browse the repository at this point in the history
* test: add cache-centric integration tests for crud
  • Loading branch information
cristianmtr committed Jan 15, 2021
1 parent c35296e commit 90b779d
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 42 deletions.
25 changes: 15 additions & 10 deletions jina/drivers/cache.py
@@ -1,7 +1,7 @@
from typing import Any, Dict

from .index import BaseIndexDriver
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY, ID_KEY

if False:
from .. import Document
Expand All @@ -26,15 +26,20 @@ def __init__(self, with_serialization: bool = False, *args, **kwargs):
def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.field = self.exec.field

for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)
if self._method_name == 'delete':
self.exec_fn([d.id for d in docs])
elif self._method_name == 'update':
self.exec_fn([d.id for d in docs], [d.id if self.field == ID_KEY else d.content_hash for d in docs])
else:
for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)

def on_miss(self, doc: 'Document', data) -> None:
"""Function to call when doc is missing, the default behavior is add to cache when miss
Expand Down
25 changes: 12 additions & 13 deletions jina/drivers/search.py
Expand Up @@ -121,17 +121,16 @@ def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.logger.warning(f'these bad docs can not be added: {bad_docs}')
idx, dist = self.exec_fn(embed_vecs, top_k=int(self.top_k))

if idx is None and dist is None:
return

op_name = self.exec.__class__.__name__
for doc, topks, scores in zip(doc_pts, idx, dist):

topk_embed = fill_fn(topks) if (self._fill_embedding and fill_fn) else [None] * len(topks)
for numpy_match_id, score, vec in zip(topks, scores, topk_embed):
m = Document(id=int(numpy_match_id))
m.score = NamedScore(op_name=op_name,
value=score)
r = doc.matches.append(m)
if vec is not None:
r.embedding = vec
# can be None if index is size 0
if idx is not None and dist is not None:
for doc, topks, scores in zip(doc_pts, idx, dist):

topk_embed = fill_fn(topks) if (self._fill_embedding and fill_fn) else [None] * len(topks)
for numpy_match_id, score, vec in zip(topks, scores, topk_embed):
m = Document(id=int(numpy_match_id))
m.score = NamedScore(op_name=op_name,
value=score)
r = doc.matches.append(m)
if vec is not None:
r.embedding = vec
20 changes: 16 additions & 4 deletions jina/executors/indexers/__init__.py
Expand Up @@ -162,17 +162,29 @@ def flush(self):
except:
pass

def _filter_nonexistent_keys(self, keys: Iterator, existent_keys: Iterator, check_path: str):
indices_to_drop = []
def _filter_nonexistent_keys_values(self, keys: Iterator, values: Iterator, existent_keys: Iterator, check_path: str) -> Tuple[List, List]:
keys = list(keys)
values = list(values)
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
values = [values[i] for i in range(len(values)) if i not in indices_to_drop]
return keys, values

def _filter_nonexistent_keys(self, keys: Iterator, existent_keys: Iterator, check_path: str) -> List:
keys = list(keys)
indices_to_drop = self._get_indices_to_drop(keys, existent_keys, check_path)
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys

def _get_indices_to_drop(self, keys: List, existent_keys: Iterator, check_path: str):
indices_to_drop = []
for key_index, key in enumerate(keys):
if key not in existent_keys:
indices_to_drop.append(key_index)
if indices_to_drop:
self.logger.warning(
f'Key(s) {[keys[i] for i in indices_to_drop]} were not found in {check_path}. Continuing anyway...')
keys = [keys[i] for i in range(len(keys)) if i not in indices_to_drop]
return keys
return indices_to_drop


class BaseVectorIndexer(BaseIndexer):
Expand Down
22 changes: 13 additions & 9 deletions jina/executors/indexers/cache.py
Expand Up @@ -67,7 +67,6 @@ def __init__(self, index_filename: str = None, *args, **kwargs):
raise ValueError(f"Field '{self.field}' not in supported list of {self.supported_fields}")

def add(self, doc_id: 'UniqueId', *args, **kwargs):
self._size += 1
self.query_handler.ids.append(doc_id)

# optimization. don't duplicate ids
Expand All @@ -76,6 +75,7 @@ def add(self, doc_id: 'UniqueId', *args, **kwargs):
if data is None:
raise ValueError(f'Got None from CacheDriver')
self.query_handler.content_hash.append(data)
self._size += 1

def query(self, data, *args, **kwargs) -> Optional[bool]:
"""
Expand All @@ -98,13 +98,15 @@ def update(self, keys: Iterator['UniqueId'], values: Iterator[any], *args, **kwa
"""
:param keys: list of Document.id
:param values: list of either `id` or `content_hash` of :class:`Document"""
keys = self._filter_nonexistent_keys(keys, self.query_handler.ids, self.save_abspath)
# if we don't cache anything else, no need
if self.field != ID_KEY:
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.ids, self.save_abspath)

for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
# optimization. don't duplicate ids
if self.field != ID_KEY:
self.query_handler.content_hash[key_idx] = cached_field
for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
# optimization. don't duplicate ids
if self.field != ID_KEY:
self.query_handler.content_hash[key_idx] = cached_field

def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):
"""
Expand All @@ -122,11 +124,13 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):

def get_add_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1

def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)

def get_create_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
4 changes: 3 additions & 1 deletion jina/executors/indexers/keyvalue.py
Expand Up @@ -55,6 +55,8 @@ def __init__(self, *args, **kwargs):
self._page_size = mmap.ALLOCATIONGRANULARITY

def add(self, keys: Iterator[int], values: Iterator[bytes], *args, **kwargs):
if len(keys) != len(values):
raise ValueError(f'Len of keys {len(keys)} did not match len of values {len(values)}')
for key, value in zip(keys, values):
l = len(value) #: the length
p = int(self._start / self._page_size) * self._page_size #: offset of the page
Expand All @@ -78,7 +80,7 @@ def query(self, key: int) -> Optional[bytes]:
return m[r:]

def update(self, keys: Iterator[int], values: Iterator[bytes], *args, **kwargs):
keys = self._filter_nonexistent_keys(keys, self.query_handler.header.keys(), self.save_abspath)
keys, values = self._filter_nonexistent_keys_values(keys, values, self.query_handler.header.keys(), self.save_abspath)
self._delete(keys)
self.add(keys, values)
return
Expand Down
3 changes: 2 additions & 1 deletion jina/executors/indexers/vector.py
Expand Up @@ -127,7 +127,8 @@ def add(self, keys: 'np.ndarray', vectors: 'np.ndarray', *args, **kwargs) -> Non
self._size += keys.shape[0]

def update(self, keys: Sequence[int], values: Sequence[bytes], *args, **kwargs) -> None:
keys = self._filter_nonexistent_keys(keys, self.ext2int_id.keys(), self.save_abspath)
# noinspection PyTypeChecker
keys, values = self._filter_nonexistent_keys_values(keys, values, self.ext2int_id.keys(), self.save_abspath)
# could be empty
# please do not use "if keys:", it wont work on both sequence and ndarray
if getattr(keys, 'size', len(keys)):
Expand Down
Empty file added tests/integration/cache.py
Empty file.
Empty file.
17 changes: 17 additions & 0 deletions tests/integration/docidcache/_merge_matches_topk.yml
@@ -0,0 +1,17 @@
!BaseExecutor
with: {}
metas:
name: merge_matches_topk
requests:
on:
[SearchRequest, TrainRequest, IndexRequest, DeleteRequest, UpdateRequest]:
- !ReduceAllDriver
with:
traversal_paths: ['m']
- !SliceQL
with:
start: 0
end: $JINA_TOPK
traversal_paths: ['m']
ControlRequest:
- !ControlReqDriver {}
29 changes: 29 additions & 0 deletions tests/integration/docidcache/cache.yml
@@ -0,0 +1,29 @@
!DocIDCache
with:
field: $JINA_CACHE_FIELD
index_filename: cache
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
requests:
on:
IndexRequest:
- !TaggingCacheDriver
with:
executor: cache
traversal_paths: [r, c]
tags:
is_indexed: true
UpdateRequest:
- !BaseCacheDriver
with:
method: update
executor: cache
traversal_paths: [r, c]
DeleteRequest:
- !BaseCacheDriver
with:
method: delete
executor: cache
traversal_paths: [r, c]

28 changes: 28 additions & 0 deletions tests/integration/docidcache/crud_cache_flow_index.yml
@@ -0,0 +1,28 @@
!Flow
pods:
cache:
uses: cache.yml
show_exc_info: true
vector:
uses: vector.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: cache
kv:
uses: kv.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: $JINA_KV_NEEDS
final:
needs: $JINA_MERGER_NEEDS
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
show_exc_info: true

19 changes: 19 additions & 0 deletions tests/integration/docidcache/crud_cache_flow_query.yml
@@ -0,0 +1,19 @@
!Flow
pods:
vector:
uses: vector.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches_topk.yml'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
kv:
uses: kv.yml
shards: $JINA_SHARDS
separated_workspace: True
uses_after: '_merge_matches_topk.yml'
polling: $JINA_POLLING
timeout_ready: '-1'
show_exc_info: true
needs: $JINA_KV_NEEDS
40 changes: 40 additions & 0 deletions tests/integration/docidcache/kv.yml
@@ -0,0 +1,40 @@
!CompoundIndexer
components:
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_KV_IDX_NAME
metas:
name: inc_docindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
requests:
on:
UpdateRequest:
- !KVIndexDriver
with:
method: update
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
DeleteRequest:
- !KVIndexDriver
with:
method: delete
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
IndexRequest:
- !FilterQL
with:
lookups: {tags__is_indexed__neq: true}
- !KVIndexDriver
with:
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
SearchRequest:
- !KVSearchDriver
with:
is_merge: false
executor: $JINA_KV_IDX_NAME
top_k: $JINA_TOPK
traversal_paths: [m]

0 comments on commit 90b779d

Please sign in to comment.