Skip to content

Commit

Permalink
feat: ref indexer avoid getting abspath (#1595)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Jan 5, 2021
1 parent 63bc924 commit 692c2d9
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 2 deletions.
9 changes: 7 additions & 2 deletions jina/executors/indexers/vector.py
Expand Up @@ -49,7 +49,6 @@ def __init__(self,
self.compress_level = compress_level
self.key_bytes = b''
self.key_dtype = None
self._ref_index_abspath = None
self.valid_indices = np.array([], dtype=bool)

if ref_indexer:
Expand All @@ -62,8 +61,14 @@ def __init__(self,
self._size = ref_indexer._size
# point to the ref_indexer.index_filename
# so that later in `post_init()` it will load from the referred index_filename
self._ref_index_abspath = ref_indexer.index_abspath
self.valid_indices = ref_indexer.valid_indices
self.index_filename = ref_indexer.index_filename
self.logger.warning(f'\n'
f'num_dim extracted from `ref_indexer` to {ref_indexer.num_dim} \n'
f'_size extracted from `ref_indexer` to {ref_indexer._size} \n'
f'dtype extracted from `ref_indexer` to {ref_indexer.dtype} \n'
f'compress_level overriden from `ref_indexer` to {ref_indexer.compress_level} \n'
f'index_filename overriden from `ref_indexer` to {ref_indexer.index_filename}')

@property
def index_abspath(self) -> str:
Expand Down
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/ref_indexer/index.yml
@@ -0,0 +1,5 @@
!Flow
pods:
indexer:
uses: pods/indexer.yml
parallel: $JINA_TEST_REF_INDEXER_PARALLEL
6 changes: 6 additions & 0 deletions tests/integration/ref_indexer/pods/indexer.yml
@@ -0,0 +1,6 @@
!NumpyIndexer
with:
index_filename: 'index.gz'
metas:
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER
name: wrapidx
12 changes: 12 additions & 0 deletions tests/integration/ref_indexer/pods/indexer_with_ref.yml
@@ -0,0 +1,12 @@
!NumpyIndexer
with:
ref_indexer:
!NumpyIndexer
with:
index_filename: 'index.gz'
metas:
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY
name: wrapidx
metas:
name: indexer
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY
5 changes: 5 additions & 0 deletions tests/integration/ref_indexer/query.yml
@@ -0,0 +1,5 @@
!Flow
pods:
indexer:
uses: pods/indexer_with_ref.yml
parallel: $JINA_TEST_REF_INDEXER_PARALLEL
@@ -0,0 +1,91 @@
import os
import shutil

import numpy as np
import pytest

from jina.flow import Flow
from jina import Document


@pytest.fixture
def parallel(request):
os.environ['JINA_TEST_REF_INDEXER_PARALLEL'] = str(request.param)
yield
del os.environ['JINA_TEST_REF_INDEXER_PARALLEL']


@pytest.fixture
def index_docs():
docs = []
for idx in range(0, 100):
doc = Document()
doc.id = f'{idx:0>16}'
doc.embedding = doc.embedding = np.array([idx, idx])
docs.append(doc)
return docs


@pytest.fixture
def random_workspace(tmpdir):
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'] = str(tmpdir)
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'] = str(tmpdir)
yield
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER']
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY']


@pytest.mark.parametrize('parallel', [1, 2], indirect=True)
def test_indexer_with_ref_indexer(random_workspace, parallel, index_docs, mocker):
top_k = 10
with Flow.load_config('index.yml') as index_flow:
index_flow.index(input_fn=index_docs, batch_size=10)

mock = mocker.Mock()

def validate_response(resp):
mock()
assert len(resp.search.docs) == 1
assert len(resp.search.docs[0].matches) == top_k

query_document = Document()
query_document.embedding = np.array([1, 1])
with Flow.load_config('query.yml') as query_flow:
query_flow.search(input_fn=[query_document], on_done=validate_response, top_k=top_k)

mock.assert_called_once()


@pytest.fixture
def random_workspace_move(tmpdir):
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'] = str(tmpdir) + '/index'
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'] = str(tmpdir) + '/query'
yield
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER']
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY']


@pytest.mark.parametrize('parallel', [1, 2], indirect=True)
def test_indexer_with_ref_indexer_move(random_workspace_move, parallel, index_docs, mocker):
top_k = 10
with Flow.load_config('index.yml') as index_flow:
index_flow.index(input_fn=index_docs, batch_size=10)

mock = mocker.Mock()

shutil.copytree(os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'],
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'])

shutil.rmtree(os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'])

def validate_response(resp):
mock()
assert len(resp.search.docs) == 1
assert len(resp.search.docs[0].matches) == top_k

query_document = Document()
query_document.embedding = np.array([1, 1])
with Flow.load_config('query.yml') as query_flow:
query_flow.search(input_fn=[query_document], on_done=validate_response, top_k=top_k)

mock.assert_called_once()
31 changes: 31 additions & 0 deletions tests/unit/executors/indexers/test_numpyindexer.py
Expand Up @@ -361,3 +361,34 @@ def test_numpy_indexer_known_and_delete(batch_size, compress_level, test_metas):
assert idx.shape == dist.shape
assert idx.shape == (len(queries), top_k)
np.testing.assert_equal(indexer.query_by_id([6, 5]), vectors[[2, 1]])


@pytest.mark.parametrize('compress_level', [0, 1, 2, 3])
def test_numpy_indexer_with_ref_indexer(compress_level, test_metas):
vectors = np.array([[1, 1, 1],
[10, 10, 10],
[100, 100, 100],
[1000, 1000, 1000]])
keys = np.array([4, 5, 6, 7]).reshape(-1, 1)
with NumpyIndexer(metric='euclidean', index_filename='np.test.gz', compress_level=compress_level,
metas=test_metas) as indexer:
indexer.add(keys, vectors)
indexer.save()
assert os.path.exists(indexer.index_abspath)
index_filename = indexer.index_filename

queries = np.array([[1, 1, 1],
[10, 10, 10],
[100, 100, 100],
[1000, 1000, 1000]])
with NumpyIndexer(metric='euclidean', ref_indexer=indexer, metas=test_metas) as new_indexer:
assert new_indexer.compress_level == compress_level
assert new_indexer.index_filename == index_filename
assert isinstance(indexer, NumpyIndexer)
if compress_level == 0:
assert isinstance(new_indexer.query_handler, np.memmap)
idx, dist = new_indexer.query(queries, top_k=2)
np.testing.assert_equal(idx, np.array([[4, 5], [5, 4], [6, 5], [7, 6]]))
assert idx.shape == dist.shape
assert idx.shape == (4, 2)
np.testing.assert_equal(new_indexer.query_by_id([7, 4]), vectors[[3, 0]])

0 comments on commit 692c2d9

Please sign in to comment.