Skip to content

Commit

Permalink
test: add cache-centric integration tests for crud
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Jan 8, 2021
1 parent 2eb4e02 commit 63034b7
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 19 deletions.
25 changes: 15 additions & 10 deletions jina/drivers/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict

from .index import BaseIndexDriver
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY
from ..executors.indexers.cache import DATA_FIELD, CONTENT_HASH_KEY, ID_KEY

if False:
from .. import Document
Expand All @@ -26,15 +26,20 @@ def __init__(self, with_serialization: bool = False, *args, **kwargs):
def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
self.field = self.exec.field

for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)
if self._method_name == 'delete':
self.exec_fn([d.id for d in docs])
elif self._method_name == 'update':
self.exec_fn([d.id for d in docs], [d.id if self.field == ID_KEY else d.content_hash for d in docs])
else:
for d in docs:
data = d.id
if self.field == CONTENT_HASH_KEY:
data = d.content_hash
result = self.exec[data]
if result is None:
self.on_miss(d, data)
else:
self.on_hit(d, result)

def on_miss(self, doc: 'Document', data) -> None:
"""Function to call when doc is missing, the default behavior is add to cache when miss
Expand Down
8 changes: 5 additions & 3 deletions jina/executors/indexers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, path, logger):
self.ids = pickle.load(open(path + '.ids', 'rb'))
self.content_hash = pickle.load(open(path + '.cache', 'rb'))
except FileNotFoundError as e:
logger.warning(
logger.debug(
f'File path did not exist : {path}.ids or {path}.cache: {repr(e)}. Creating new CacheHandler...')
self.ids = []
self.content_hash = []
Expand Down Expand Up @@ -122,11 +122,13 @@ def delete(self, keys: Iterator['UniqueId'], *args, **kwargs):

def get_add_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1

def get_query_handler(self) -> CacheHandler:
return self.CacheHandler(self.index_abspath, self.logger)

def get_create_handler(self):
# not needed, as we use the queryhandler
pass
# FIXME better way to silence warnings
return 1
Empty file added tests/integration/cache.py
Empty file.
Empty file.
255 changes: 255 additions & 0 deletions tests/integration/docidcache/test_crud_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import os

import numpy as np
import pytest

from jina import Flow, Document
from jina.executors.indexers import BaseIndexer
from jina.executors.indexers.cache import DocIDCache
from jina.executors.indexers.keyvalue import BinaryPbIndexer
from jina.executors.indexers.vector import NumpyIndexer
from tests.integration.incremental_indexing import random_workspace
from pathlib import Path

cur_dir = Path(os.path.dirname(os.path.abspath(__file__)))

# don't remove this line, otherwise auto-code-format will remove `random_workspace`
print(random_workspace)

KV_IDX_FILENAME = 'kv_idx.bin'
VEC_IDX_FILENAME = 'vec_idx.bin'


def config(field, tmp_workspace):
os.environ['JINA_CACHE_FIELD'] = field
os.environ['JINA_TEST_CACHE_CRUD_WORKSPACE'] = str(tmp_workspace)
os.environ['JINA_KV_IDX_NAME'] = KV_IDX_FILENAME.split('.bin')[0]
os.environ['JINA_VEC_IDX_NAME'] = VEC_IDX_FILENAME.split('.bin')[0]


def get_flow(indexers, shards):
vector_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_kv.yml',
name='inc_vec'
)
pb_pod = dict(
uses=cur_dir / 'yml' / 'cp_cache_vector.yml',
name='inc_doc'
)

if shards == 3:
sharding_config = dict(
shards=shards,
separated_workspace=True,
uses_after='_merge_matches',
polling='all',
timeout_ready='-1'
)
pb_pod.update(
sharding_config
)
vector_pod.update(
sharding_config
)

f: Flow = (Flow()
.add(**pb_pod)
.add(**vector_pod))

if indexers == 'parallel':
vector_pod.update(
dict(
needs=['gateway']
)
)
f = (Flow()
.add(**pb_pod)
.add(**vector_pod)
.add(needs=['inc_vec', 'inc_doc']))
flow_plot = os.path.join(os.environ.get('JINA_TEST_CACHE_CRUD_WORKSPACE'), f'flow-{indexers}-{shards}.svg')
f.plot(flow_plot)
print(f'Flow plotted to {flow_plot}')
return f


d_embedding = np.random.random([9])
c_embedding = np.random.random([9])


def get_documents(chunks, content_same, nr=10):
next_chunk_id = nr
for i in range(nr):
with Document() as d:
d.id = i
if content_same:
d.text = 'hello world'
d.embedding = d_embedding
else:
d.text = f'hello world {i}'
d.embedding = np.random.random([9])
for j in range(chunks):
with Document() as c:
c.id = next_chunk_id
if content_same:
c.text = 'hello world from chunk'
c.embedding = c_embedding
else:
c.text = f'hello world from chunk {j}'
c.embedding = np.random.random([9])

next_chunk_id += 1
d.chunks.append(c)
yield d


@pytest.mark.parametrize('chunks, content_same, nr',
# cartesian product of possibilities
[(0, False, 0),
(0, False, 10),
(0, False, 100),
(0, True, 0),
(0, True, 10),
(0, True, 100),
(3, False, 0),
(3, False, 10),
(3, False, 100),
(3, True, 0),
(3, True, 10),
(3, True, 100),
(5, False, 0),
(5, False, 10),
(5, False, 100),
(5, True, 0),
(5, True, 10),
(5, True, 100)])
def test_docs_generator(chunks, content_same, nr):
docs = list(get_documents(chunks=chunks, content_same=content_same, nr=nr))
assert len(docs) == nr
ids_used = set()
for i, d in enumerate(docs):
assert d.id not in ids_used
ids_used.add(d.id)

if content_same:
assert d.text == 'hello world'
np.testing.assert_almost_equal(d.embedding, d_embedding)
else:
assert d.text == f'hello world {i}'
assert d.embedding.shape == d_embedding.shape

assert len(d.chunks) == chunks

for j, c in enumerate(d.chunks):
assert c.id not in ids_used
ids_used.add(c.id)
if content_same:
assert c.text == 'hello world from chunk'
np.testing.assert_almost_equal(c.embedding, c_embedding)
else:
assert c.text == f'hello world from chunk {j}'
assert c.embedding.shape == c_embedding.shape
print(f'{len(ids_used)=}')


@pytest.mark.parametrize('indexers, field, shards, chunks, same_content',
[
# cartesian product of the parameters
# TODO prune these
('sequential', 'id', 1, 0, False),
# ('sequential', 'id', 1, 0, True),
('sequential', 'id', 1, 5, False),
# ('sequential', 'id', 1, 5, True),
# ('sequential', 'id', 3, 0, False),
# ('sequential', 'id', 3, 0, True),
('sequential', 'id', 3, 5, False),
# ('sequential', 'id', 3, 5, True),
('sequential', 'content_hash', 1, 0, False),
# ('sequential', 'content_hash', 1, 0, True),
('sequential', 'content_hash', 1, 5, False),
# ('sequential', 'content_hash', 1, 5, True),
# ('sequential', 'content_hash', 3, 0, False),
# ('sequential', 'content_hash', 3, 0, True),
('sequential', 'content_hash', 3, 5, False),
# ('sequential', 'content_hash', 3, 5, True),
('parallel', 'id', 1, 0, False),
# ('parallel', 'id', 1, 0, True),
('parallel', 'id', 1, 5, False),
# ('parallel', 'id', 1, 5, True),
# ('parallel', 'id', 3, 0, False),
# ('parallel', 'id', 3, 0, True),
('parallel', 'id', 3, 5, False),
# ('parallel', 'id', 3, 5, True),
('parallel', 'content_hash', 1, 0, False),
# ('parallel', 'content_hash', 1, 0, True),
('parallel', 'content_hash', 1, 5, False),
# ('parallel', 'content_hash', 1, 5, True),
# ('parallel', 'content_hash', 3, 0, False),
# ('parallel', 'content_hash', 3, 0, True),
('parallel', 'content_hash', 3, 5, False),
# ('parallel', 'content_hash', 3, 5, True)
])
def test_cache_crud(
random_workspace,
indexers,
field,
shards,
chunks,
same_content
):
print(f'{random_workspace=}')
print(f'Running combination: {indexers=}, {field=}, {shards=}, {chunks=}')

config(field, random_workspace)
f = get_flow(indexers, shards)
print('Flow = ')
print(f'{f.yaml_spec}')
print(f'{f=}')

docs = list(get_documents(chunks=chunks, content_same=same_content))
print(f'{len(docs)=}')

with f:
f.index(docs)

print(f'files: {os.listdir(random_workspace)}')

check_indexers_size(chunks, len(docs), field, random_workspace, same_content, shards)

# with f:
# f.index(docs)
#
# check_indexers_size(chunks, len(docs), field, random_workspace, same_content, shards)

with f:
f.delete(docs)

check_indexers_size(chunks, 0, field, random_workspace, same_content, shards)


def check_indexers_size(chunks, nr_docs, field, random_workspace, same_content, shards):
for indexer_fname in [KV_IDX_FILENAME, VEC_IDX_FILENAME]:
if shards == 1:
with BaseIndexer.load(random_workspace / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
if field == 'content_hash' and same_content:
assert indexer.size == 1 + chunks
else:
assert indexer.size == nr_docs + chunks * nr_docs
else:
for i in range(shards):
full_size = 0
indexer_folder = 'docindexer' if indexer_fname == KV_IDX_FILENAME else 'vecindexer'
with BaseIndexer.load(random_workspace / f'inc_{indexer_folder}-{i + 1}' / indexer_fname) as indexer:
if indexer_fname == KV_IDX_FILENAME:
assert isinstance(indexer, BinaryPbIndexer)
else:
assert isinstance(indexer, NumpyIndexer)
full_size += indexer.size

if field == 'content_hash' and same_content:
assert full_size == 1 + chunks
else:
assert full_size == nr_docs + chunks * nr_docs
55 changes: 55 additions & 0 deletions tests/integration/docidcache/yml/cp_cache_kv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
!UniquePbIndexer
components:
- !DocIDCache
with:
field: $JINA_CACHE_FIELD
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: cache
- !BinaryPbIndexer
with:
index_filename: doc.gz
metas:
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
name: $JINA_KV_IDX_NAME
metas:
name: inc_docindexer
workspace: $JINA_TEST_CACHE_CRUD_WORKSPACE
requests:
on:
UpdateRequest:
- !BaseCacheDriver
with:
method: update
executor: cache
traversal_paths: [r, c]
- !KVIndexDriver
with:
method: update
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
DeleteRequest:
- !BaseCacheDriver
with:
method: delete
executor: cache
traversal_paths: [r, c]
- !KVIndexDriver
with:
method: delete
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
IndexRequest:
- !BaseCacheDriver
with:
executor: cache
traversal_paths: [r, c]
- !KVIndexDriver
with:
executor: $JINA_KV_IDX_NAME
traversal_paths: [r, c]
SearchRequest:
- !KVSearchDriver
with:
executor: $JINA_KV_IDX_NAME
traversal_paths: ['m']

0 comments on commit 63034b7

Please sign in to comment.