Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ref indexer avoid getting abspath #1595

Merged
merged 1 commit into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions jina/executors/indexers/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(self,
self.compress_level = compress_level
self.key_bytes = b''
self.key_dtype = None
self._ref_index_abspath = None
self.valid_indices = np.array([], dtype=bool)

if ref_indexer:
Expand All @@ -62,8 +61,14 @@ def __init__(self,
self._size = ref_indexer._size
# point to the ref_indexer.index_filename
# so that later in `post_init()` it will load from the referred index_filename
self._ref_index_abspath = ref_indexer.index_abspath
self.valid_indices = ref_indexer.valid_indices
self.index_filename = ref_indexer.index_filename
self.logger.warning(f'\n'
f'num_dim extracted from `ref_indexer` to {ref_indexer.num_dim} \n'
f'_size extracted from `ref_indexer` to {ref_indexer._size} \n'
f'dtype extracted from `ref_indexer` to {ref_indexer.dtype} \n'
f'compress_level overriden from `ref_indexer` to {ref_indexer.compress_level} \n'
f'index_filename overriden from `ref_indexer` to {ref_indexer.index_filename}')

@property
def index_abspath(self) -> str:
Expand Down
Empty file.
5 changes: 5 additions & 0 deletions tests/integration/ref_indexer/index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
!Flow
pods:
indexer:
uses: pods/indexer.yml
parallel: $JINA_TEST_REF_INDEXER_PARALLEL
6 changes: 6 additions & 0 deletions tests/integration/ref_indexer/pods/indexer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
!NumpyIndexer
with:
index_filename: 'index.gz'
metas:
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER
name: wrapidx
12 changes: 12 additions & 0 deletions tests/integration/ref_indexer/pods/indexer_with_ref.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!NumpyIndexer
with:
ref_indexer:
!NumpyIndexer
with:
index_filename: 'index.gz'
metas:
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY
name: wrapidx
metas:
name: indexer
workspace: $JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY
5 changes: 5 additions & 0 deletions tests/integration/ref_indexer/query.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
!Flow
pods:
indexer:
uses: pods/indexer_with_ref.yml
parallel: $JINA_TEST_REF_INDEXER_PARALLEL
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import shutil

import numpy as np
import pytest

from jina.flow import Flow
from jina import Document


@pytest.fixture
def parallel(request):
os.environ['JINA_TEST_REF_INDEXER_PARALLEL'] = str(request.param)
yield
del os.environ['JINA_TEST_REF_INDEXER_PARALLEL']


@pytest.fixture
def index_docs():
docs = []
for idx in range(0, 100):
doc = Document()
doc.id = f'{idx:0>16}'
doc.embedding = doc.embedding = np.array([idx, idx])
docs.append(doc)
return docs


@pytest.fixture
def random_workspace(tmpdir):
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'] = str(tmpdir)
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'] = str(tmpdir)
yield
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER']
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY']


@pytest.mark.parametrize('parallel', [1, 2], indirect=True)
def test_indexer_with_ref_indexer(random_workspace, parallel, index_docs, mocker):
top_k = 10
with Flow.load_config('index.yml') as index_flow:
index_flow.index(input_fn=index_docs, batch_size=10)

mock = mocker.Mock()

def validate_response(resp):
mock()
assert len(resp.search.docs) == 1
assert len(resp.search.docs[0].matches) == top_k

query_document = Document()
query_document.embedding = np.array([1, 1])
with Flow.load_config('query.yml') as query_flow:
query_flow.search(input_fn=[query_document], on_done=validate_response, top_k=top_k)

mock.assert_called_once()


@pytest.fixture
def random_workspace_move(tmpdir):
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'] = str(tmpdir) + '/index'
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'] = str(tmpdir) + '/query'
yield
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER']
del os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY']


@pytest.mark.parametrize('parallel', [1, 2], indirect=True)
def test_indexer_with_ref_indexer_move(random_workspace_move, parallel, index_docs, mocker):
top_k = 10
with Flow.load_config('index.yml') as index_flow:
index_flow.index(input_fn=index_docs, batch_size=10)

mock = mocker.Mock()

shutil.copytree(os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'],
os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER_QUERY'])

shutil.rmtree(os.environ['JINA_TEST_INDEXER_WITH_REF_INDEXER'])

def validate_response(resp):
mock()
assert len(resp.search.docs) == 1
assert len(resp.search.docs[0].matches) == top_k

query_document = Document()
query_document.embedding = np.array([1, 1])
with Flow.load_config('query.yml') as query_flow:
query_flow.search(input_fn=[query_document], on_done=validate_response, top_k=top_k)

mock.assert_called_once()
31 changes: 31 additions & 0 deletions tests/unit/executors/indexers/test_numpyindexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,34 @@ def test_numpy_indexer_known_and_delete(batch_size, compress_level, test_metas):
assert idx.shape == dist.shape
assert idx.shape == (len(queries), top_k)
np.testing.assert_equal(indexer.query_by_id([6, 5]), vectors[[2, 1]])


@pytest.mark.parametrize('compress_level', [0, 1, 2, 3])
def test_numpy_indexer_with_ref_indexer(compress_level, test_metas):
vectors = np.array([[1, 1, 1],
[10, 10, 10],
[100, 100, 100],
[1000, 1000, 1000]])
keys = np.array([4, 5, 6, 7]).reshape(-1, 1)
with NumpyIndexer(metric='euclidean', index_filename='np.test.gz', compress_level=compress_level,
metas=test_metas) as indexer:
indexer.add(keys, vectors)
indexer.save()
assert os.path.exists(indexer.index_abspath)
index_filename = indexer.index_filename

queries = np.array([[1, 1, 1],
[10, 10, 10],
[100, 100, 100],
[1000, 1000, 1000]])
with NumpyIndexer(metric='euclidean', ref_indexer=indexer, metas=test_metas) as new_indexer:
assert new_indexer.compress_level == compress_level
assert new_indexer.index_filename == index_filename
assert isinstance(indexer, NumpyIndexer)
if compress_level == 0:
assert isinstance(new_indexer.query_handler, np.memmap)
idx, dist = new_indexer.query(queries, top_k=2)
np.testing.assert_equal(idx, np.array([[4, 5], [5, 4], [6, 5], [7, 6]]))
assert idx.shape == dist.shape
assert idx.shape == (4, 2)
np.testing.assert_equal(new_indexer.query_by_id([7, 4]), vectors[[3, 0]])