Skip to content

Commit

Permalink
test(daemon): add distribute tests (#1727)
Browse files Browse the repository at this point in the history
* test(daemon): add distribute tests
  • Loading branch information
hanxiao committed Jan 20, 2021
1 parent 6d20552 commit 5a78355
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 23 deletions.
Empty file.
17 changes: 17 additions & 0 deletions tests/distributed/test_against_external_daemon/mwu_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Any

import numpy as np

from jina.executors.encoders import BaseEncoder


class MWUEncoder(BaseEncoder):

def __init__(self, greetings: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self._greetings = greetings
self.logger.success(f'look at me! {greetings}')

def encode(self, data: Any, *args, **kwargs) -> Any:
self.logger.info(f'{self._greetings} {data}')
return np.random.random([data.shape[0], 3])
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
!MWUEncoder
with:
greetings: im from internal yaml!
metas:
name: my-mwu-encoder
py_modules: mwu_encoder.py
workspace: ./
116 changes: 116 additions & 0 deletions tests/distributed/test_against_external_daemon/test_single_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os

import numpy as np
import pytest

from jina import Flow

CLOUD_HOST = 'cloud.jina.ai:8000' # consider it as the staged version
NUM_DOCS = 100


@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_r_l_simple(silent_log, parallels, mocker):
response_mock = mocker.Mock()
f = (Flow(expose_public=True, parallel=parallels)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
.add(parallel=parallels))
with f:
f.index(('hello' for _ in range(NUM_DOCS)), on_done=response_mock)

response_mock.assert_called()


@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_l_r_simple(silent_log, parallels, mocker):
response_mock = mocker.Mock()

f = (Flow(expose_public=True, parallel=parallels)
.add(parallel=parallels)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
)
with f:
f.index(('hello' for _ in range(NUM_DOCS)), on_done=response_mock)
response_mock.assert_called()


@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_r_l_r_simple(silent_log, parallels, mocker):
response_mock = mocker.Mock()

f = (Flow(expose_public=True)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
.add()
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
)
with f:
f.index(('hello' for _ in range(NUM_DOCS)), on_done=response_mock)
response_mock.assert_called()


@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_r_r_r_simple(silent_log, parallels, mocker):
response_mock = mocker.Mock()

f = (Flow(expose_public=True)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
)
with f:
f.index(('hello' for _ in range(NUM_DOCS)), on_done=response_mock)
response_mock.assert_called()


@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_l_r_l_simple(silent_log, parallels, mocker):
response_mock = mocker.Mock()

f = (Flow(expose_public=True)
.add()
.add(host=CLOUD_HOST,
parallel=parallels,
silent_remote_logs=silent_log)
.add()
)
with f:
f.index(('hello' for _ in range(NUM_DOCS)), on_done=response_mock)
response_mock.assert_called()


@pytest.mark.skipif('GITHUB_WORKFLOW' in os.environ,
reason='somehow this upload test does not work on Github action, but locally it works fine!')
@pytest.mark.parametrize('silent_log', [True, False])
@pytest.mark.parametrize('parallels', [1, 2])
def test_l_r_l_with_upload(silent_log, parallels, mocker):
response_mock = mocker.Mock()
f = (Flow()
.add()
.add(uses='mwu_encoder.yml',
host=CLOUD_HOST,
parallel=parallels,
upload_files=['mwu_encoder.py'],
silent_remote_logs=silent_log)
.add())
with f:
f.index_ndarray(np.random.random([NUM_DOCS, 100]), on_done=response_mock)
response_mock.assert_called()
16 changes: 8 additions & 8 deletions tests/integration/crud/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=random_docs(0, NUMBER_OF_SEARCHES),
output_fn=validate_result_factory(TOPK))
on_done=validate_result_factory(TOPK))
mock.assert_called_once()

with Flow.load_config(flow_file) as index_flow:
Expand All @@ -81,7 +81,7 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=random_docs(0, NUMBER_OF_SEARCHES),
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()


Expand All @@ -102,7 +102,7 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=chain(random_docs(2, 5), random_docs(100, 120)),
output_fn=validate_result_factory(3))
on_done=validate_result_factory(3))
mock.assert_called_once()

with Flow.load_config(flow_file) as index_flow:
Expand All @@ -112,7 +112,7 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=random_docs(2, 4),
output_fn=validate_result_factory(1))
on_done=validate_result_factory(1))
mock.assert_called_once()


Expand Down Expand Up @@ -154,7 +154,7 @@ def validate_results(resp):
with Flow.load_config(flow_file) as search_flow:
search_docs = list(random_docs(0, NUMBER_OF_SEARCHES))
search_flow.search(input_fn=search_docs,
output_fn=validate_result_factory(has_changed=False))
on_done=validate_result_factory(has_changed=False))
mock.assert_called_once()

with Flow.load_config(flow_file) as index_flow:
Expand All @@ -164,7 +164,7 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=random_docs(0, NUMBER_OF_SEARCHES),
output_fn=validate_result_factory(has_changed=True))
on_done=validate_result_factory(has_changed=True))
mock.assert_called_once()


Expand All @@ -188,7 +188,7 @@ def validate_results(resp):
with Flow.load_config(flow_file) as search_flow:
search_docs = list(random_docs(0, NUMBER_OF_SEARCHES))
search_flow.search(input_fn=search_docs,
output_fn=validate_results)
on_done=validate_results)
mock.assert_called_once()

with Flow.load_config(flow_file) as index_flow:
Expand All @@ -198,5 +198,5 @@ def validate_results(resp):
mock = mocker.Mock()
with Flow.load_config(flow_file) as search_flow:
search_flow.search(input_fn=random_docs(0, NUMBER_OF_SEARCHES),
output_fn=validate_results)
on_done=validate_results)
mock.assert_called_once()
24 changes: 12 additions & 12 deletions tests/integration/crud_corrupted_docs/test_crud_corrupted_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(EXPECTED_ONLY_TAGS_RESULTS))
on_done=validate_result_factory(EXPECTED_ONLY_TAGS_RESULTS))
mock.assert_called_once()

# this won't increase the index size as the ids are new
Expand All @@ -75,7 +75,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(EXPECTED_ONLY_TAGS_RESULTS))
on_done=validate_result_factory(EXPECTED_ONLY_TAGS_RESULTS))
mock.assert_called_once()

with f:
Expand All @@ -86,7 +86,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()


Expand Down Expand Up @@ -159,7 +159,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(TOPK))
on_done=validate_result_factory(TOPK))
mock.assert_called_once()

# this won't increase the index size as the ids are new
Expand All @@ -170,7 +170,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(TOPK))
on_done=validate_result_factory(TOPK))
mock.assert_called_once()

with f:
Expand All @@ -180,7 +180,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f:
f.search(input_fn=docs_search,
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()


Expand Down Expand Up @@ -226,7 +226,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f_query:
f_query.search(input_fn=docs_search,
output_fn=validate_result_factory(TOPK))
on_done=validate_result_factory(TOPK))
mock.assert_called_once()

# this won't increase the index size as the ids are new
Expand All @@ -237,7 +237,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f_query:
f_query.search(input_fn=docs_search,
output_fn=validate_result_factory(TOPK))
on_done=validate_result_factory(TOPK))
mock.assert_called_once()

with f_index:
Expand All @@ -247,7 +247,7 @@ def validate_results(resp):
mock = mocker.Mock()
with f_query:
f_query.search(input_fn=docs_search,
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()


Expand Down Expand Up @@ -299,7 +299,7 @@ def validate_results(resp):
with f_query:
f_query.search(input_fn=docs_search,
# 0 because search docs have wrong shape
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()

# this won't increase the index size as the ids are new
Expand All @@ -311,7 +311,7 @@ def validate_results(resp):
with f_query:
f_query.search(input_fn=docs_search,
# 0 because search docs have wrong shape
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()

with f_index:
Expand All @@ -321,5 +321,5 @@ def validate_results(resp):
mock = mocker.Mock()
with f_query:
f_query.search(input_fn=docs_search,
output_fn=validate_result_factory(0))
on_done=validate_result_factory(0))
mock.assert_called_once()
6 changes: 3 additions & 3 deletions tests/integration/docidcache/test_crud_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def validate_results(resp):
with flow_query as f:
f.search(
search_docs,
output_fn=validate_result_factory(TOP_K)
on_done=validate_result_factory(TOP_K)
)
mock.assert_called_once()

Expand Down Expand Up @@ -287,7 +287,7 @@ def validate_results(resp):
with flow_query as f:
f.search(
search_docs,
output_fn=validate_result_factory(TOP_K)
on_done=validate_result_factory(TOP_K)
)
mock.assert_called_once()

Expand All @@ -302,6 +302,6 @@ def validate_results(resp):
with flow_query as f:
f.search(
search_docs,
output_fn=validate_result_factory(0)
on_done=validate_result_factory(0)
)
mock.assert_called_once()

0 comments on commit 5a78355

Please sign in to comment.