Skip to content

Commit

Permalink
fix: sharding parallel one (#1657)
Browse files Browse the repository at this point in the history
* fix: sharding parallel one

* fix: pea workspace

* refactor: format executor init

* fix: sharding file name

* refactor: remove unused code
  • Loading branch information
florian-hoenicke committed Jan 13, 2021
1 parent 5e1a423 commit cd7d4ad
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
5 changes: 3 additions & 2 deletions jina/executors/__init__.py
Expand Up @@ -112,7 +112,8 @@ def __init__(awesomeness = 5):
"""
store_args_kwargs = False #: set this to ``True`` to save ``args`` (in a list) and ``kwargs`` (in a map) in YAML config
exec_methods = ['encode', 'add', 'query', 'craft', 'segment', 'score', 'evaluate', 'predict', 'query_by_id', 'delete', 'update']
exec_methods = ['encode', 'add', 'query', 'craft', 'segment', 'score', 'evaluate', 'predict', 'query_by_id',
'delete', 'update']

def __init__(self, *args, **kwargs):
if isinstance(args, tuple) and len(args) > 0:
Expand Down Expand Up @@ -257,7 +258,7 @@ def current_workspace(self) -> str:
:return: if ``separated_workspace`` is set to ``False`` then ``metas.workspace`` is returned,
otherwise the ``metas.pea_workspace`` is returned
"""
work_dir = self.pea_workspace if self.separated_workspace else self.workspace # type: str
work_dir = self.pea_workspace if self.separated_workspace and self.pea_id != -1 else self.workspace # type: str
return work_dir

def get_file_from_workspace(self, name: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion jina/jaml/parsers/executor/legacy.py
Expand Up @@ -12,7 +12,7 @@ class LegacyParser(VersionedYAMLParser):
@staticmethod
def _get_dump_path_from_config(meta_config: Dict):
if 'name' in meta_config:
if meta_config.get('separated_workspace', False) is True:
if meta_config.get('separated_workspace', False) is True and meta_config['pea_id'] != -1:
if 'pea_id' in meta_config and isinstance(meta_config['pea_id'], int):
work_dir = meta_config['pea_workspace']
dump_path = os.path.join(work_dir, f'{meta_config["name"]}.{"bin"}')
Expand Down
Expand Up @@ -30,13 +30,14 @@ def test_unique_indexing_vecindexers(random_workspace, restful):
assert vector_indexer.size == num_uniq_docs


@pytest.mark.parametrize('separated_workspace', [False, True])
@pytest.mark.parametrize('restful', [False, True])
def test_unique_indexing_docindexers(random_workspace, restful):
def test_unique_indexing_docindexers(random_workspace, restful, separated_workspace):
total_docs = 10
duplicate_docs, num_uniq_docs = get_duplicate_docs(num_docs=total_docs)

f = (Flow(restful=restful)
.add(uses=os.path.join(cur_dir, 'uniq_docindexer.yml'), shards=1))
.add(uses=os.path.join(cur_dir, 'uniq_docindexer.yml'), shards=1, separated_workspace=separated_workspace))

with f:
f.index(duplicate_docs)
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/test_index.py
Expand Up @@ -157,9 +157,7 @@ def test_index(test_workspace_index):
f = Flow().add(uses=os.path.join(cur_dir, 'yaml/test-index.yml'), parallel=3, separated_workspace=True)
with f:
f.index(input_fn=random_docs(50))

for j in range(3):
assert os.path.join(test_workspace_index, f'test2-{j + 1}/test2.bin')
assert os.path.exists(os.path.join(test_workspace_index, f'test2-{j + 1}/tmp2'))


Expand Down

0 comments on commit cd7d4ad

Please sign in to comment.