Skip to content

Commit

Permalink
test: add cache-centric integration tests for crud
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Jan 7, 2021
1 parent 51d7e50 commit 3208f86
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 17 deletions.
25 changes: 15 additions & 10 deletions jina/drivers/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict

from .index import BaseIndexDriver
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY, ID_KEY

if False:
from .. import Document
Expand All @@ -26,15 +26,20 @@ def __init__(self, with_serialization: bool = False, *args, **kwargs):
def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.field = self.exec.field

for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)
if self._method_name == 'delete':
self.exec_fn([d.id for d in docs])
elif self._method_name == 'update':
self.exec_fn([d.id for d in docs], [d.id if self.field == ID_KEY else d.content_hash for d in docs])
else:
for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)

def on_miss(self, doc: 'Document', data) -> None:
"""Function to call when doc is missing, the default behavior is add to cache when miss
Expand Down
12 changes: 7 additions & 5 deletions jina/executors/indexers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def update(self, keys: Iterator['UniqueId'], values: Iterator[any], *args, **kwa
:param values: list of either `id` or `content_hash` of :class:`Document"""
missed = check_keys_exist(keys, self.query_handler.ids)
if missed:
raise KeyError(f'Keys {missed} were not found in {self.index_abspath}. No operation performed...')
raise KeyError(f'Keys {missed} were not found in {self.save_abspath}. No operation performed...')

for key, cached_field in zip(keys, values):
key_idx = self.query_handler.ids.index(key)
Expand All @@ -115,7 +115,7 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):
"""
missed = check_keys_exist(keys, self.query_handler.ids)
if missed:
raise KeyError(f'Keys {missed} were not found in {self.index_abspath}. No operation performed...')
raise KeyError(f'Keys {missed} were not found in {self.save_abspath}. No operation performed...')

for key in keys:
key_idx = self.query_handler.ids.index(key)
Expand All @@ -127,11 +127,13 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):

def get_add_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1

def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)
return self.CacheHandler(self.save_abspath, self.logger)

def get_create_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
Empty file added tests/integration/cache.py
Empty file.
Empty file.
257 changes: 257 additions & 0 deletions tests/integration/docidcache/test_crud_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import os

import numpy as np
import pytest

from jina import Flow, Document
from jina.executors.indexers import BaseIndexer
from jina.executors.indexers.cache import DocIDCache
from jina.executors.indexers.keyvalue import BinaryPbIndexer
from jina.executors.indexers.vector import NumpyIndexer
from tests.integration.incremental_indexing import random_workspace
from pathlib import Path

cur_dir = Path(os.path.dirname(os.path.abspath(__file__)))

# don't remove this line, otherwise auto-code-format will remove `random_workspace`
print(random_workspace)

KV_IDX_FILENAME = 'kv_idx.bin'
VEC_IDX_FILENAME = 'vec_idx.bin'


def config(field, tmp_workspace):
os.environ['JINA_CACHE_FIELD'] = field
os.environ['JINA_TEST_CACHE_CRUD_WORKSPACE'] = str(tmp_workspace)
os.environ['JINA_KV_IDX_FILENAME'] = KV_IDX_FILENAME.split('.bin')[0]
os.environ['JINA_VEC_IDX_FILENAME'] = VEC_IDX_FILENAME.split('.bin')[0]


def get_flow(indexers, shards):
vector_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_pb.yml',
name='inc_vec'
)
pb_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_vector.yml',
name='inc_doc'
)

if shards == 3:
sharding_config = dict(
shards=shards,
separated_workspace=True,
uses_after='_merge_matches',
polling='all',
timeout_ready='-1'
)
pb_pod.update(
sharding_config
)
vector_pod.update(
sharding_config
)

f: Flow = (Flow()
.add(**pb_pod)
.add(**vector_pod))

if indexers == 'parallel':
vector_pod.update(
dict(
needs=['gateway']
)
)
f = (Flow()
.add(**pb_pod)
.add(**vector_pod)
.add(needs=['inc_vec', 'inc_doc']))
flow_plot = os.path.join(os.environ.get('JINA_TEST_CACHE_CRUD_WORKSPACE'), f'flow-{indexers}-{shards}.svg')
f.plot(flow_plot)
print(f'Flow plotted to {flow_plot}')
return f


d_embedding = np.random.random([9])
c_embedding = np.random.random([9])


def get_documents(chunks, content_same, nr=10):
next_chunk_id = nr
for i in range(nr):
with Document() as d:
d.id = i
if content_same:
d.text = 'hello world'
d.embedding = d_embedding
else:
d.text = f'hello world {i}'
d.embedding = np.random.random([9])
for j in range(chunks):
with Document() as c:
c.id = next_chunk_id
if content_same:
c.text = 'hello world from chunk'
c.embedding = c_embedding
else:
c.text = f'hello world from chunk {j}'
c.embedding = np.random.random([9])

next_chunk_id += 1
d.chunks.append(c)
yield d


@pytest.mark.parametrize('chunks, content_same, nr',
# cartesian product of possibilities
[(0, False, 0),
(0, False, 10),
(0, False, 100),
(0, True, 0),
(0, True, 10),
(0, True, 100),
(3, False, 0),
(3, False, 10),
(3, False, 100),
(3, True, 0),
(3, True, 10),
(3, True, 100),
(5, False, 0),
(5, False, 10),
(5, False, 100),
(5, True, 0),
(5, True, 10),
(5, True, 100)])
def test_docs_generator(chunks, content_same, nr):
docs = list(get_documents(chunks=chunks, content_same=content_same, nr=nr))
assert len(docs) == nr
ids_used = set()
for i, d in enumerate(docs):
assert d.id not in ids_used
ids_used.add(d.id)

if content_same:
assert d.text == 'hello world'
np.testing.assert_almost_equal(d.embedding, d_embedding)
else:
assert d.text == f'hello world {i}'
assert d.embedding.shape == d_embedding.shape

assert len(d.chunks) == chunks

for j, c in enumerate(d.chunks):
assert c.id not in ids_used
ids_used.add(c.id)
if content_same:
assert c.text == 'hello world from chunk'
np.testing.assert_almost_equal(c.embedding, c_embedding)
else:
assert c.text == f'hello world from chunk {j}'
assert c.embedding.shape == c_embedding.shape
print(f'{len(ids_used)=}')


@pytest.mark.parametrize('indexers, field, shards, chunks, same_content',
[
# cartesian product of the parameters
# TODO prune these
# ('sequential', 'id', 1, 0, False),
# ('sequential', 'id', 1, 0, True),
('sequential', 'id', 1, 5, False),
# ('sequential', 'id', 1, 5, True),
# ('sequential', 'id', 3, 0, False),
# ('sequential', 'id', 3, 0, True),
# ('sequential', 'id', 3, 5, False),
# ('sequential', 'id', 3, 5, True),
# ('sequential', 'content_hash', 1, 0, False),
# ('sequential', 'content_hash', 1, 0, True),
# ('sequential', 'content_hash', 1, 5, False),
# ('sequential', 'content_hash', 1, 5, True),
# ('sequential', 'content_hash', 3, 0, False),
# ('sequential', 'content_hash', 3, 0, True),
# ('sequential', 'content_hash', 3, 5, False),
# ('sequential', 'content_hash', 3, 5, True),
# ('parallel', 'id', 1, 0, False),
# ('parallel', 'id', 1, 0, True),
# ('parallel', 'id', 1, 5, False),
# ('parallel', 'id', 1, 5, True),
# ('parallel', 'id', 3, 0, False),
# ('parallel', 'id', 3, 0, True),
# ('parallel', 'id', 3, 5, False),
# ('parallel', 'id', 3, 5, True),
# ('parallel', 'content_hash', 1, 0, False),
# ('parallel', 'content_hash', 1, 0, True),
# ('parallel', 'content_hash', 1, 5, False),
# ('parallel', 'content_hash', 1, 5, True),
# ('parallel', 'content_hash', 3, 0, False),
# ('parallel', 'content_hash', 3, 0, True),
# ('parallel', 'content_hash', 3, 5, False),
# ('parallel', 'content_hash', 3, 5, True)
])
def test_cache_crud(
random_workspace,
indexers,
field,
shards,
chunks,
same_content
):
print(f'{random_workspace=}')
print(f'Running combination: {indexers=}, {field=}, {shards=}, {chunks=}')

config(field, random_workspace)
f = get_flow(indexers, shards)
print('Flow = ')
print(f'{f.yaml_spec}')
print(f'{f=}')

docs = list(get_documents(chunks=chunks, content_same=same_content))
print(f'{len(docs)=}')

with f:
f.index(docs)

print(f'files: {os.listdir(random_workspace)}')

check_indexers_size(chunks, len(docs), field, random_workspace, same_content, shards)

# with f:
# f.index(docs)
#
# check_indexers_size(chunks, len(docs), field, random_workspace, same_content, shards)

ids = [d for d in docs]
ids.extend([c for d in docs for c in d.chunks])
with f:
f.delete(np.array(ids))

check_indexers_size(chunks, 0, field, random_workspace, same_content, shards)


def check_indexers_size(chunks, nr_docs, field, random_workspace, same_content, shards):
for indexer_fname in [KV_IDX_FILENAME, VEC_IDX_FILENAME]:
if shards == 1:
with BaseIndexer.load(random_workspace / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
if field == 'content_hash' and same_content:
assert indexer.size == 1 + chunks
else:
assert indexer.size == nr_docs + chunks * nr_docs
else:
for i in range(shards):
full_size = 0
indexer_folder = 'docindexer' if indexer_fname == KV_IDX_FILENAME else 'vecindexer'
with BaseIndexer.load(random_workspace / f'inc_{indexer_folder}-{i + 1}' / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
full_size += indexer.size

if field == 'content_hash' and same_content:
assert full_size == 1 + chunks
else:
assert full_size == nr_docs + chunks * nr_docs
60 changes: 60 additions & 0 deletions tests/integration/docidcache/yml/cp_cache_pb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
!UniquePbIndexer
components:
- !DocIDCache
with:
field: $JINA_CACHE_FIELD
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_KV_IDX_FILENAME
metas:
name: inc_docindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
requests:
on:
UpdateRequest:
- !BaseCacheDriver
with:
method: update
executor: cache
traversal_paths: [ 'r' ]
- !KVIndexDriver
with:
method: update
executor: $JINA_KV_IDX_FILENAME
traversal_paths: [ 'r' ]
DeleteRequest:
- !BaseCacheDriver
with:
method: delete
executor: cache
traversal_paths: [ 'r' ]
- !KVIndexDriver
with:
method: delete
executor: $JINA_KV_IDX_FILENAME
traversal_paths: [ 'r' ]
IndexRequest:
- !BaseCacheDriver
with:
executor: cache
traversal_paths: ['r']
- !KVIndexDriver
with:
executor: $JINA_KV_IDX_FILENAME
traversal_paths: ['r']
SearchRequest:
- !BaseCacheDriver
with:
executor: cache
top_k: $JINA_TOPK
traversal_paths: ['r']
- !KVSearchDriver
with:
executor: $JINA_KV_IDX_FILENAME
traversal_paths: ['m']

0 comments on commit 3208f86

Please sign in to comment.