Skip to content

Commit

Permalink
feat: remove separated workspace param (#1722)
Browse files Browse the repository at this point in the history
* feat: remove separated workspace param

* fix: submodule

* fix: cache test

* feat: remove pea_workspace (#1739)

* test: compound workspace

* fix: first test workspace changed

* feat: add compound_workspace and compound_name metas

* test: add test with ref_indexer

* feat: add ref_indexer boolean as meta

* feat: remove ref_indexer as meta

* test: add more tests in integration ref_indexer

* test: add test with container

* test: need to pass docker_kwargs to pass env to docker

* fix: fix uses internal

* fix: fix test crud

* fix: consider workspace default None

* fix: update jina/executors/__init__.py

Co-authored-by: Joan Fontanals <joan.martinez@jina.ai>
  • Loading branch information
florian-hoenicke and JoanFM committed Jan 22, 2021
1 parent 8878ccd commit 75f9569
Show file tree
Hide file tree
Showing 100 changed files with 705 additions and 291 deletions.
1 change: 0 additions & 1 deletion .github/i18n/README.de.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ pods:
chunk_idx:
uses: helloworld.indexer.chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
6 changes: 2 additions & 4 deletions .github/i18n/README.es.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ from jina.flow import Flow

f = (Flow()
.add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2,
separated_workspace=True))
.add(uses='indexer.yml', shards=2))

with f:
f.index(fashion_mnist, batch_size=1024)
Expand All @@ -161,7 +160,6 @@ pods:
index:
uses: indexer.yml
shards: 2
separated_workspace: true
```

</td>
Expand All @@ -182,7 +180,7 @@ pods:
from jina.flow import Flow

f = (Flow().add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2, separated_workspace=True))
.add(uses='indexer.yml', shards=2))
```

#### [Distribuyendo Flow](https://docs.jina.ai/chapters/remote/index.html)
Expand Down
1 change: 0 additions & 1 deletion .github/i18n/README.fr.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ pods:
chunk_idx:
uses: helloworld.indexer.chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
1 change: 0 additions & 1 deletion .github/i18n/README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ pods:
chunk_idx:
uses: helloworld.indexer.chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
6 changes: 2 additions & 4 deletions .github/i18n/README.kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ from jina.flow import Flow

f = (Flow()
.add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2,
separated_workspace=True))
.add(uses='indexer.yml', shards=2))

with f:
f.index(fashion_mnist, batch_size=1024)
Expand All @@ -160,7 +159,6 @@ pods:
index:
uses: indexer.yml
shards: 2
separated_workspace: true
```

</td>
Expand All @@ -181,7 +179,7 @@ pods:
from jina.flow import Flow

f = (Flow().add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2, separated_workspace=True))
.add(uses='indexer.yml', shards=2))
```

#### [플로우 배포](https://docs.jina.ai/chapters/remote/index.html)
Expand Down
6 changes: 2 additions & 4 deletions .github/i18n/README.pt_br.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ from jina.flow import Flow

f = (Flow()
.add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2,
separated_workspace=True))
.add(uses='indexer.yml', shards=2))

with f:
f.index(fashion_mnist, batch_size=1024)
Expand All @@ -158,7 +157,6 @@ pods:
index:
uses: indexer.yml
shards: 2
separated_workspace: true
```

</td>
Expand All @@ -179,7 +177,7 @@ pods:
from jina.flow import Flow

f = (Flow().add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2, separated_workspace=True))
.add(uses='indexer.yml', shards=2))
```

#### [Distribuindo Flow](https://docs.jina.ai/chapters/remote/index.html)
Expand Down
1 change: 0 additions & 1 deletion .github/i18n/README.ru.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ pods:
chunk_idx:
uses: helloworld.indexer.chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
6 changes: 2 additions & 4 deletions .github/i18n/README.uk.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ from jina.flow import Flow

f = (Flow()
.add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2,
separated_workspace=True))
.add(uses='indexer.yml', shards=2))

with f:
f.index(fashion_mnist, batch_size=1024)
Expand All @@ -159,7 +158,6 @@ pods:
index:
uses: indexer.yml
shards: 2
separated_workspace: true
```

</td>
Expand All @@ -180,7 +178,7 @@ pods:
from jina.flow import Flow

f = (Flow().add(uses='encoder.yml', parallel=2)
.add(uses='indexer.yml', shards=2, separated_workspace=True))
.add(uses='indexer.yml', shards=2))
```

#### [Розподіл потоку](https://docs.jina.ai/chapters/remote/index.html)
Expand Down
1 change: 0 additions & 1 deletion .github/i18n/README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ pods:
chunk_idx:
uses: helloworld.indexer.chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ c.components = lambda: [a, b]
Now let's add our indexer YAML file to the Flow with `.add(uses=)`. Let's also add two shards to the indexer to improve its scalability:

```python
f = Flow().add(uses='MyEncoder', parallel=2).add(uses='myindexer.yml', shards=2, separated_workspace=True).plot()
f = Flow().add(uses='MyEncoder', parallel=2).add(uses='myindexer.yml', shards=2).plot()
```

<img src="https://github.com/jina-ai/jina/blob/master/.github/simple-flow1.svg?raw=true"/>
Expand All @@ -429,7 +429,6 @@ pods:
- name:index
uses: myindexer.yml
shards: 2
separated_workspace: true
```

And then load it in Python:
Expand Down
1 change: 0 additions & 1 deletion docs/chapters/flow/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ pods:
chunk_idx:
uses: index/chunk.yml
replicas: $SHARDS
separated_workspace: true
join_all:
uses: _pass
needs: [doc_idx, chunk_idx]
Expand Down
1 change: 0 additions & 1 deletion docs/chapters/helloworld/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ And the implementation behind it? It's simple:
index:
uses: helloworld.indexer.yml
shards: 2
separated_workspace: true
.. confval:: Flow in Dashboard
Expand Down
1 change: 0 additions & 1 deletion jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
chunk_idx:
uses: index/chunk.yml
parallel: ${{PARALLEL}}
separated_workspace: true
parallel_type: !PollingType ANY
# or
parallel_type: ANY
Expand Down
51 changes: 34 additions & 17 deletions jina/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
_ref_desolve_map = SimpleNamespace()
_ref_desolve_map.__dict__['metas'] = SimpleNamespace()
_ref_desolve_map.__dict__['metas'].__dict__['pea_id'] = 0
_ref_desolve_map.__dict__['metas'].__dict__['separated_workspace'] = False


class ExecutorType(type(JAMLCompatible), type):
Expand Down Expand Up @@ -241,30 +240,52 @@ def config_abspath(self) -> str:
"""
return self.get_file_from_workspace(f'{self.name}.yml')

@staticmethod
def get_shard_workspace(workspace_folder: str, workspace_name: str, pea_id: int) -> str:
# TODO (Joan, Florian). We would prefer not to keep `pea_id` condition, but afraid many tests rely on this
return os.path.join(workspace_folder, f'{workspace_name}-{pea_id}') if pea_id > 0 else workspace_folder

@property
def workspace_name(self):
return self.name

@property
def _workspace(self):
""" Property to access `workspace` if existing or default to `./`. Useful to provide good interface when
using executors directly in python.
.. highlight:: python
.. code-block:: python
with NumpyIndexer() as indexer:
indexer.touch()
:return: returns the workspace property of the executor or default to './'
"""
return self.workspace or './'

@property
def current_workspace(self) -> str:
""" Get the path of the current workspace.
def shard_workspace(self) -> str:
""" Get the path of the current shard.
:return: if ``separated_workspace`` is set to ``False`` then ``metas.workspace`` is returned,
otherwise the ``metas.pea_workspace`` is returned
:return: returns the workspace of the shard of this Executor
"""
work_dir = self.pea_workspace if self.separated_workspace and self.pea_id != -1 else self.workspace # type: str
return work_dir
return BaseExecutor.get_shard_workspace(self._workspace, self.workspace_name, self.pea_id)

def get_file_from_workspace(self, name: str) -> str:
"""Get a usable file path under the current workspace
:param name: the name of the file
:return depending on ``metas.separated_workspace`` the file could be located in ``metas.workspace`` or ``metas.pea_workspace``
:return file path
"""
Path(self.current_workspace).mkdir(parents=True, exist_ok=True)
return os.path.join(self.current_workspace, name)
Path(self.shard_workspace).mkdir(parents=True, exist_ok=True)
return os.path.join(self.shard_workspace, name)

@property
def physical_size(self) -> int:
"""Return the size of the current workspace in bytes"""
root_directory = Path(self.current_workspace)
root_directory = Path(self.shard_workspace)
return sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file())

def __getstate__(self):
Expand Down Expand Up @@ -299,7 +320,7 @@ def touch(self) -> None:

def save(self, filename: str = None):
"""
Persist data of this executor to the :attr:`workspace` (or :attr:`pea_workspace`). The data could be
Persist data of this executor to the :attr:`shard_workspace`. The data could be
a file or collection of files produced/used during an executor run.
These are some of the common data that you might want to persist:
Expand Down Expand Up @@ -345,23 +366,19 @@ def save(self, filename: str = None):
@classmethod
def inject_config(cls: Type[AnyExecutor],
raw_config: Dict,
separated_workspace: bool = False,
pea_id: int = 0,
read_only: bool = False,
*args, **kwargs) -> Dict:
"""Inject config into the raw_config before loading into an object.
:param raw_config: raw config to work on
:param separated_workspace: the dump and data files associated to this executor will be stored separately for
each parallel pea, which will be indexed by the ``pea_id``
:param pea_id: the id of the storage of this parallel pea, only effective when ``separated_workspace=True``
:param pea_id: the id of the storage of this parallel pea
:param read_only: if the executor should be readonly
:return: an executor object
"""
if 'metas' not in raw_config:
raw_config['metas'] = {}
tmp = fill_metas_with_defaults(raw_config)
tmp['metas']['separated_workspace'] = separated_workspace
tmp['metas']['pea_id'] = pea_id
tmp['metas']['read_only'] = read_only

Expand Down
27 changes: 20 additions & 7 deletions jina/executors/compound.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CompoundExecutor(BaseExecutor):
c[0].add(obj)
.. note::
All components ``workspace`` and ``pea_workspace`` are overrided by their :class:`CompoundExecutor` counterparts.
Component ``workspace`` and ``pea_id`` are overriden by their :class:`CompoundExecutor` counterparts.
.. warning::
Expand Down Expand Up @@ -192,6 +192,10 @@ def sayB(self):
- dummyB-e3acc910
- say
.. warning::
When setting inner `executors` in `components` the `workspace` configuration will not be used and will be overriden
by a workspace extracted considering the name of the `CompoundExecutor`, the name of each internal `Component` and the `pea_id`
"""
super().__init__(*args, **kwargs)
self._components = None # type: List[AnyExecutor]
Expand Down Expand Up @@ -248,22 +252,31 @@ def components(self, comps: Callable[[], List]) -> None:
if not callable(comps):
raise TypeError('components must be a callable function that returns '
'a List[BaseExecutor]')
if not getattr(self, 'init_from_yaml', False):

# Important to handle when loading a CompoundExecutor when `inner` executors have not been loaded from yaml
if not getattr(self, '_init_from_yaml', False):
self._components = comps()
if not isinstance(self._components, list):
raise TypeError(f'components expect a list of executors, receiving {type(self._components)!r}')
# self._set_comp_workspace()
self._set_comp_workspace()
self._set_routes()
self._resolve_routes()
else:
self.logger.debug('components is omitted from construction, as it is initialized from yaml config')

@staticmethod
def get_component_workspace_from_compound_workspace(compound_workspace: str, compound_name: str, pea_id: int) -> str:
import os
return BaseExecutor.get_shard_workspace(compound_workspace, compound_name, pea_id) if pea_id > 0 else \
os.path.join(compound_workspace, compound_name)

def _set_comp_workspace(self) -> None:
# overrider the workspace setting for all components
# overrides the workspace setting for all components
for c in self.components:
c.separated_workspace = self.separated_workspace
c.workspace = self.workspace
c.pea_workspace = self.current_workspace
if not c.workspace and self.workspace:
c_workspace = CompoundExecutor.get_component_workspace_from_compound_workspace(self.workspace, self.name, self.pea_id)
self.logger.warning(f'Setting workspace of {c.name} to {c_workspace}')
c.workspace = c_workspace

def _resolve_routes(self) -> None:
if self._routes:
Expand Down
1 change: 1 addition & 0 deletions jina/executors/evaluators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def variance(self) -> float:


class FileBasedEvaluator(CompoundExecutor):

"""A Frequently used pattern for combining A :class:`BinaryPbIndexer` and :class:`BaseEvaluator`.
It will be equipped with predefined ``requests.on`` behaviors:
Expand Down
8 changes: 7 additions & 1 deletion jina/executors/indexers/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self,
self.key_bytes = b''
self.key_dtype = None
self.valid_indices = np.array([], dtype=bool)
self.ref_indexer_workspace_name = None

if ref_indexer:
# copy the header info of the binary file
Expand All @@ -69,6 +70,11 @@ def __init__(self,
f'dtype extracted from `ref_indexer` to {ref_indexer.dtype} \n'
f'compress_level overriden from `ref_indexer` to {ref_indexer.compress_level} \n'
f'index_filename overriden from `ref_indexer` to {ref_indexer.index_filename}')
self.ref_indexer_workspace_name = ref_indexer.workspace_name

@property
def workspace_name(self):
return self.name if self.ref_indexer_workspace_name is None else self.ref_indexer_workspace_name

@property
def index_abspath(self) -> str:
Expand All @@ -77,7 +83,7 @@ def index_abspath(self) -> str:
Use index_abspath
"""
return getattr(self, '_ref_index_abspath', None) or self.get_file_from_workspace(self.index_filename)
return self.get_file_from_workspace(self.index_filename)

def get_add_handler(self):
"""Open a binary gzip file for adding new vectors
Expand Down

0 comments on commit 75f9569

Please sign in to comment.