Skip to content

Commit

Permalink
test: add cache-centric integration tests for crud
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Jan 7, 2021
1 parent 51d7e50 commit 4295a92
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 4 deletions.
6 changes: 4 additions & 2 deletions jina/executors/indexers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):

def get_add_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1

def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)

def get_create_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
Empty file added tests/integration/cache.py
Empty file.
Empty file.
246 changes: 246 additions & 0 deletions tests/integration/docidcache/test_crud_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import os

import numpy as np
import pytest

from jina import Flow, Document
from jina.executors.indexers import BaseIndexer
from jina.executors.indexers.cache import DocIDCache
from jina.executors.indexers.keyvalue import BinaryPbIndexer
from jina.executors.indexers.vector import NumpyIndexer
from tests.integration.incremental_indexing import random_workspace
from pathlib import Path

cur_dir = Path(os.path.dirname(os.path.abspath(__file__)))

# don't remove this line, otherwise auto-code-format will remove `random_workspace`
print(random_workspace)

KV_IDX_FILENAME = 'kv_idx.bin'
VEC_IDX_FILENAME = 'vec_idx.bin'


def config(field, tmp_workspace):
os.environ['JINA_CACHE_FIELD'] = field
os.environ['JINA_TEST_CACHE_CRUD_WORKSPACE'] = str(tmp_workspace)
os.environ['JINA_KV_IDX_FILENAME'] = KV_IDX_FILENAME.split('.bin')[0]
os.environ['JINA_VEC_IDX_FILENAME'] = VEC_IDX_FILENAME.split('.bin')[0]


def get_flow(indexers, shards):
vector_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_pb.yml',
name='inc_vec'
)
pb_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_vector.yml',
name='inc_doc'
)

if shards == 3:
sharding_config = dict(
shards=shards,
separated_workspace=True,
uses_after='_merge_matches',
polling='all',
timeout_ready='-1'
)
pb_pod.update(
sharding_config
)
vector_pod.update(
sharding_config
)

f: Flow = (Flow()
.add(**pb_pod)
.add(**vector_pod))

if indexers == 'parallel':
vector_pod.update(
dict(
needs=['gateway']
)
)
f = (Flow()
.add(**pb_pod)
.add(**vector_pod)
.add(needs=['inc_vec', 'inc_doc']))
flow_plot = os.path.join(os.environ.get('JINA_TEST_CACHE_CRUD_WORKSPACE'), f'flow-{indexers}-{shards}.svg')
f.plot(flow_plot)
print(f'Flow plotted to {flow_plot}')
return f


d_embedding = np.random.random([9])
c_embedding = np.random.random([9])


def get_documents(chunks, content_same, nr=10):
next_chunk_id = nr
for i in range(nr):
with Document() as d:
d.id = i
if content_same:
d.text = 'hello world'
d.embedding = d_embedding
else:
d.text = f'hello world {i}'
d.embedding = np.random.random([9])
for j in range(chunks):
with Document() as c:
c.id = next_chunk_id
if content_same:
c.text = 'hello world from chunk'
c.embedding = c_embedding
else:
c.text = f'hello world from chunk {j}'
c.embedding = np.random.random([9])

next_chunk_id += 1
d.chunks.append(c)
yield d


@pytest.mark.parametrize('chunks, content_same, nr',
# cartesian product of possibilities
[(0, False, 0),
(0, False, 10),
(0, False, 100),
(0, True, 0),
(0, True, 10),
(0, True, 100),
(3, False, 0),
(3, False, 10),
(3, False, 100),
(3, True, 0),
(3, True, 10),
(3, True, 100),
(5, False, 0),
(5, False, 10),
(5, False, 100),
(5, True, 0),
(5, True, 10),
(5, True, 100)])
def test_docs_generator(chunks, content_same, nr):
docs = list(get_documents(chunks=chunks, content_same=content_same, nr=nr))
assert len(docs) == nr
ids_used = set()
for i, d in enumerate(docs):
assert d.id not in ids_used
ids_used.add(d.id)

if content_same:
assert d.text == 'hello world'
np.testing.assert_almost_equal(d.embedding, d_embedding)
else:
assert d.text == f'hello world {i}'
assert d.embedding.shape == d_embedding.shape

assert len(d.chunks) == chunks

for j, c in enumerate(d.chunks):
assert c.id not in ids_used
ids_used.add(c.id)
if content_same:
assert c.text == 'hello world from chunk'
np.testing.assert_almost_equal(c.embedding, c_embedding)
else:
assert c.text == f'hello world from chunk {j}'
assert c.embedding.shape == c_embedding.shape
print(f'{len(ids_used)=}')


@pytest.mark.parametrize('indexers, field, shards, chunks, same_content',
[
# cartesian product of the parameters
# TODO prune these
('sequential', 'id', 1, False, False),
('sequential', 'id', 1, False, True),
('sequential', 'id', 1, True, False),
('sequential', 'id', 1, True, True),
('sequential', 'id', 3, False, False),
('sequential', 'id', 3, False, True),
('sequential', 'id', 3, True, False),
('sequential', 'id', 3, True, True),
('sequential', 'content_hash', 1, False, False),
('sequential', 'content_hash', 1, False, True),
('sequential', 'content_hash', 1, True, False),
('sequential', 'content_hash', 1, True, True),
('sequential', 'content_hash', 3, False, False),
('sequential', 'content_hash', 3, False, True),
('sequential', 'content_hash', 3, True, False),
('sequential', 'content_hash', 3, True, True),
# ('parallel', 'id', 1, False, False),
# ('parallel', 'id', 1, False, True),
# ('parallel', 'id', 1, True, False),
# ('parallel', 'id', 1, True, True),
# ('parallel', 'id', 3, False, False),
# ('parallel', 'id', 3, False, True),
# ('parallel', 'id', 3, True, False),
# ('parallel', 'id', 3, True, True),
# ('parallel', 'content_hash', 1, False, False),
# ('parallel', 'content_hash', 1, False, True),
# ('parallel', 'content_hash', 1, True, False),
# ('parallel', 'content_hash', 1, True, True),
# ('parallel', 'content_hash', 3, False, False),
# ('parallel', 'content_hash', 3, False, True),
# ('parallel', 'content_hash', 3, True, False),
# ('parallel', 'content_hash', 3, True, True)
])
def test_cache_crud(
random_workspace,
indexers,
field,
shards,
chunks,
same_content
):
print(f'{random_workspace=}')
print(f'Running combination: {indexers=}, {field=}, {shards=}, {chunks=}')

config(field, random_workspace)
f = get_flow(indexers, shards)
print('Flow = ')
print(f'{f.yaml_spec}')
print(f'{f=}')

docs = list(get_documents(chunks=chunks, content_same=same_content))
print(f'{len(docs)=}')

with f:
f.index(docs)

check_indexers_size(chunks, len(docs), field, random_workspace, same_content, shards)

with f:
f.index(docs)


def check_indexers_size(chunks, nr_docs, field, random_workspace, same_content, shards):
for indexer_fname in [KV_IDX_FILENAME, VEC_IDX_FILENAME]:
if shards == 1:
with BaseIndexer.load(random_workspace / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
if field == 'content_hash' and same_content:
assert indexer.size == 1 + chunks
else:
assert indexer.size == nr_docs + chunks * nr_docs
else:
for i in range(shards):
full_size = 0
indexer_folder = 'docindexer' if indexer_fname == KV_IDX_FILENAME else 'vecindexer'
with BaseIndexer.load(random_workspace / f'inc_{indexer_folder}-{i + 1}' / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
full_size += indexer.size

if field == 'content_hash' and same_content:
assert full_size == 1 + chunks
else:
assert full_size == nr_docs + chunks * nr_docs
17 changes: 17 additions & 0 deletions tests/integration/docidcache/yml/cp_cache_pb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
!UniquePbIndexer
components:
- !DocIDCache
with:
field: $JINA_CACHE_FIELD
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_KV_IDX_FILENAME
metas:
name: inc_docindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
19 changes: 19 additions & 0 deletions tests/integration/docidcache/yml/cp_cache_vector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
!UniqueVectorIndexer
components:
- !DocIDCache
with:
field: $JINA_CACHE_FIELD
index_path: cache.bin
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
- !NumpyIndexer
with:
index_filename: vec.gz
metric: euclidean
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_VEC_IDX_FILENAME
metas:
name: inc_vecindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
2 changes: 2 additions & 0 deletions tests/integration/incremental_indexing/_unique_doc.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
!DocIDCache
with:
index_path: cache.tmp
field: id
# TODO make it explicit
metas:
name: cache-doc
workspace: $JINA_TEST_INCREMENTAL_INDEX_WORKSPACE/doc_cache
Expand Down
1 change: 1 addition & 0 deletions tests/integration/incremental_indexing/_unique_vec.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
!DocIDCache
with:
index_path: cache.tmp
field: id
metas:
name: cache-vec
workspace: $JINA_TEST_INCREMENTAL_INDEX_WORKSPACE/vec_cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,7 @@ def test_incremental_indexing_parallel_indexers_with_shards(random_workspace):
assert isinstance(doc_indexer, BinaryPbIndexer)
doc_idx_size += doc_indexer._size
assert doc_idx_size == num_uniq_docs


# TODO chunks
# TODO update / delete
2 changes: 2 additions & 0 deletions tests/integration/incremental_indexing/uniq_docindexer.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
!UniquePbIndexer
components:
- !DocIDCache
with:
field: id
metas:
workspace: $JINA_TEST_INCREMENTAL_INDEX_WORKSPACE
name: cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_index_flow():


def get_search_flow():
num_shards = 2
num_shards = 3
f = Flow(read_only=True) \
.add(
uses='vectorindexer.yml',
Expand All @@ -40,7 +40,7 @@ def test_sharding_empty_index(tmpdir, execution_number, mocker):

f = get_index_flow()

num_docs = 1
num_docs = 10
data = []
for i in range(num_docs):
with Document() as doc:
Expand Down

0 comments on commit 4295a92

Please sign in to comment.