From b4ec89efe3a2bd6c6aef1816033292c5505f026c Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 16:25:50 +0100 Subject: [PATCH 01/11] feat(flow): add needs, inspect, version to flow yaml --- jina/excepts.py | 4 + jina/flow/__init__.py | 50 ++---------- jina/flow/parser/__init__.py | 63 ++++++++++++++++ jina/flow/parser/dumper.py | 84 +++++++++++++++++++++ jina/flow/parser/loader.py | 62 +++++++++++++++ jina/types/sets/document.py | 2 +- tests/unit/flow/test_flow_yaml_parser.py | 48 ++++++++++++ tests/unit/flow/yaml/flow-legacy-syntax.yml | 12 +++ tests/unit/flow/yaml/flow-v1-syntax.yml | 13 ++++ tests/unit/flow/yaml/flow-v1.0-syntax.yml | 14 ++++ tests/unit/flow/yaml/flow-v99-syntax.yml | 13 ++++ 11 files changed, 319 insertions(+), 46 deletions(-) create mode 100644 jina/flow/parser/__init__.py create mode 100644 jina/flow/parser/dumper.py create mode 100644 jina/flow/parser/loader.py create mode 100644 tests/unit/flow/test_flow_yaml_parser.py create mode 100644 tests/unit/flow/yaml/flow-legacy-syntax.yml create mode 100644 tests/unit/flow/yaml/flow-v1-syntax.yml create mode 100644 tests/unit/flow/yaml/flow-v1.0-syntax.yml create mode 100644 tests/unit/flow/yaml/flow-v99-syntax.yml diff --git a/jina/excepts.py b/jina/excepts.py index 06a73f1df5abc..02bd5ec90c228 100644 --- a/jina/excepts.py +++ b/jina/excepts.py @@ -177,3 +177,7 @@ class LengthMismatchException(Exception): class ImageAlreadyExists(Exception): """ Exception when an image with the name, module version, and Jina version already exists on the Hub""" + + +class BadFlowYAMLVersion(Exception): + """ Exception when Flow YAML config specifies a wrong version number""" diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index bcb03fc2604fc..c40d09bacd7f2 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -55,6 +55,7 @@ def __init__(self, args: Optional['argparse.Namespace'] = None, **kwargs): """ super().__init__() + self._version = '' # YAML version number self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod'] self._inspect_pods = {} # type: Dict[str, str] self._build_level = FlowBuildLevel.EMPTY @@ -86,30 +87,8 @@ def to_yaml(cls, representer, data): @staticmethod def _dump_instance_to_yaml(data): # note: we only save non-default property for the sake of clarity - r = {} - - if data._kwargs: - r['with'] = data._kwargs - - if data._pod_nodes: - r['pods'] = {} - - if 'gateway' in data._pod_nodes: - # always dump gateway as the first pod, if exist - r['pods']['gateway'] = {} - - for k, v in data._pod_nodes.items(): - if k == 'gateway': - continue - - kwargs = {'needs': list(v.needs)} if v.needs else {} - kwargs.update(v._kwargs) - - if 'name' in kwargs: - kwargs.pop('name') - - r['pods'][k] = kwargs - return r + from .parser import dump + return dump(data) @classmethod def from_yaml(cls, constructor, node): @@ -164,27 +143,8 @@ def _get_instance_from_yaml(cls, constructor, node): data = ruamel.yaml.constructor.SafeConstructor.construct_mapping( constructor, node, deep=True) - p = data.get('with', {}) # type: Dict[str, Any] - a = p.pop('args') if 'args' in p else () - k = p.pop('kwargs') if 'kwargs' in p else {} - # maybe there are some hanging kwargs in "parameters" - tmp_a = (expand_env_var(v) for v in a) - tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} - obj = cls(*tmp_a, **tmp_p) - - pp = data.get('pods', {}) - for pod_name, pod_attr in pp.items(): - p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()} - if pod_name != 'gateway': - # ignore gateway when reading, it will be added during build() - obj.add(name=pod_name, **p_pod_attr, copy_flow=False) - - obj.logger.success(f'successfully built {cls.__name__} from a yaml config') - - # if node.tag in {'!CompoundExecutor'}: - # os.environ['JINA_WARN_UNNAMED'] = 'YES' - - return obj, data + from .parser import parse + return parse(data), data @staticmethod def _parse_endpoints(op_flow, pod_name, endpoint, connect_to_last_pod=False) -> Set: diff --git a/jina/flow/parser/__init__.py b/jina/flow/parser/__init__.py new file mode 100644 index 0000000000000..9e16ee21a482b --- /dev/null +++ b/jina/flow/parser/__init__.py @@ -0,0 +1,63 @@ +import warnings +from typing import Dict, List + +from ...excepts import BadFlowYAMLVersion + +if False: + from .. import Flow + +loader_prefix = 'load_v_' +dumper_prefix = 'dump_v_' + + +def parse(data: Dict) -> 'Flow': + """Return the Flow YAML parser given the syntax version number + + :param data: flow yaml file loaded as python dict + """ + from . import loader + + version = data.get('version', 'legacy') + v = version.replace('.', '_') + p = getattr(loader, loader_prefix + v, None) + if not p: + p = getattr(loader, loader_prefix + v.split('_')[0], None) + warnings.warn(f'can not find the parser for {version}, ' + f'will use the parser for version: "{v.split("_")[0]}"', UserWarning) + if not p: + raise BadFlowYAMLVersion(f'{version} is not a valid version number') + obj = p(data) + obj._version = version + obj.logger.success(f'successfully built a Flow from a YAML version: {version}') + return obj + + +def dump(data: 'Flow') -> Dict: + """Return the dictionary given a versioned flow object + + :param data: versioned flow object + """ + from . import dumper + version = data._version + v = version.replace('.', '_') + p = getattr(dumper, dumper_prefix + v, None) + if not p: + p = getattr(dumper, dumper_prefix + v.split('_')[0], None) + warnings.warn(f'can not find the dumper for {version}, ' + f'will use the dumper for version: "{v.split("_")[0]}"', UserWarning) + if not p: + raise BadFlowYAMLVersion(f'{version} is not a valid version number') + return p(data) + + +def get_support_versions() -> List[str]: + """List all supported versions + + :return: supported versions sorted alphabetically + """ + from . import loader + result = [] + for v in dir(loader): + if v.startswith(loader_prefix): + result.append(v.replace(loader_prefix, '').replace('_', '.')) + return list(sorted(result)) diff --git a/jina/flow/parser/dumper.py b/jina/flow/parser/dumper.py new file mode 100644 index 0000000000000..869d7a2317a24 --- /dev/null +++ b/jina/flow/parser/dumper.py @@ -0,0 +1,84 @@ +""" +# loader function format + + def dump_v_MAJOR[_MINOR](data) + e.g. + - def dump_v_1_1(data) + - def dump_v_1(data) + +# match priority + if version is available: + - dump_v_MAJOR_MINOR + - dump_v_MAJOR + - throw BadFlowYAMLVersion + otherwise: + - dump_v_legacy +""" + +import argparse +from typing import Dict, Any + +from .. import Flow +from ...parser import set_pod_parser + + +def _get_taboo(): + """Get a set of keys that should not be dumped""" + return {k.dest for k in set_pod_parser()._actions if k.help == argparse.SUPPRESS} + + +def dump_v_legacy(data: 'Flow') -> Dict[str, Any]: + r = {} + if data._version: + r['version'] = data._version + + if data._kwargs: + r['with'] = data._kwargs + + if data._pod_nodes: + r['pods'] = {} + + if 'gateway' in data._pod_nodes: + # always dump gateway as the first pod, if exist + r['pods']['gateway'] = {} + + for k, v in data._pod_nodes.items(): + if k == 'gateway': + continue + + kwargs = {'needs': list(v.needs)} if v.needs else {} + kwargs.update(v._kwargs) + + if 'name' in kwargs: + kwargs.pop('name') + + r['pods'][k] = kwargs + return r + + +def dump_v_1(data: 'Flow') -> Dict[str, Any]: + r = {} + if data._version: + r['version'] = data._version + + if data._kwargs: + r['with'] = data._kwargs + + if data._pod_nodes: + r['pods'] = [] + + last_name = 'gateway' + for k, v in data._pod_nodes.items(): + if k == 'gateway': + continue + kwargs = {} + # only add "needs" when the value is not the last pod name + if list(v.needs) != [last_name]: + kwargs = {'needs': list(v.needs)} + kwargs.update(v._kwargs) + for t in _get_taboo(): + if t in kwargs: + kwargs.pop(t) + last_name = kwargs['name'] + r['pods'].append(kwargs) + return r diff --git a/jina/flow/parser/loader.py b/jina/flow/parser/loader.py new file mode 100644 index 0000000000000..061e1acdeb273 --- /dev/null +++ b/jina/flow/parser/loader.py @@ -0,0 +1,62 @@ +""" +# loader function format + + def load_v_MAJOR[_MINOR](data) + e.g. + - def load_v_1_1(data) + - def load_v_1(data) + +# match priority + if version is available: + - load_v_MAJOR_MINOR + - load_v_MAJOR + - throw BadFlowYAMLVersion + otherwise: + - load_v_legacy +""" + +from typing import Dict, Any + +from .. import Flow +from ...helper import expand_env_var + + +def load_v_legacy(data): + p = data.get('with', {}) # type: Dict[str, Any] + a = p.pop('args') if 'args' in p else () + k = p.pop('kwargs') if 'kwargs' in p else {} + # maybe there are some hanging kwargs in "parameters" + tmp_a = (expand_env_var(v) for v in a) + tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} + obj = Flow(*tmp_a, **tmp_p) + + pp = data.get('pods', {}) + for pod_name, pod_attr in pp.items(): + p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()} + if pod_name != 'gateway': + # ignore gateway when reading, it will be added during build() + obj.add(name=pod_name, **p_pod_attr, copy_flow=False) + # if node.tag in {'!CompoundExecutor'}: + # os.environ['JINA_WARN_UNNAMED'] = 'YES' + return obj + + +def load_v_1(data): + p = data.get('with', {}) # type: Dict[str, Any] + a = p.pop('args') if 'args' in p else () + k = p.pop('kwargs') if 'kwargs' in p else {} + # maybe there are some hanging kwargs in "parameters" + tmp_a = (expand_env_var(v) for v in a) + tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} + obj = Flow(*tmp_a, **tmp_p) + + pp = data.get('pods', []) + for pods in pp: + p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pods.items()} + # in v1 YAML, flow is an optional argument + if p_pod_attr.get('name', None) != 'gateway': + # ignore gateway when reading, it will be added during build() + method = p_pod_attr.get('method', 'add') + # support methods: add, needs, inspect + getattr(obj, method)(**p_pod_attr, copy_flow=False) + return obj diff --git a/jina/types/sets/document.py b/jina/types/sets/document.py index 05fd1b6c9166c..c0881d98f8283 100644 --- a/jina/types/sets/document.py +++ b/jina/types/sets/document.py @@ -109,7 +109,7 @@ def all_embeddings(self) -> Tuple['np.ndarray', 'DocumentSet', 'DocumentSet']: def all_contents(self) -> Tuple['np.ndarray', 'DocumentSet', 'DocumentSet']: """Return all embeddings from every document in this set as a ndarray - :return a tuple of embedding in :class:`np.ndarray`, + :return: a tuple of embedding in :class:`np.ndarray`, the corresponding documents in a :class:`DocumentSet`, and the documents have no contents in a :class:`DocumentSet`. """ diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py new file mode 100644 index 0000000000000..81dcbb59d6bc1 --- /dev/null +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -0,0 +1,48 @@ +import filecmp +from pathlib import Path + +import numpy as np +import pytest + +from jina import Flow +from jina.excepts import BadFlowYAMLVersion +from jina.flow.parser import get_support_versions + + +def test_support_versions(): + assert get_support_versions() == ['1', 'legacy'] + + +def test_load_legacy_and_v1(): + Flow.load_config('yaml/flow-legacy-syntax.yml') + Flow.load_config('yaml/flow-v1-syntax.yml') + + # this should fallback to v1 + Flow.load_config('yaml/flow-v1.0-syntax.yml') + + with pytest.raises(BadFlowYAMLVersion): + Flow.load_config('yaml/flow-v99-syntax.yml') + + +def test_add_needs_inspect(tmpdir): + f1 = (Flow().add(name='pod0', needs='gateway').add(name='pod1', needs='gateway').inspect().needs(['pod0', 'pod1'])) + f1.plot(Path(tmpdir) / 'from_python.jpg') + with f1: + f1.index_ndarray(np.random.random([5, 5]), output_fn=print) + + f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') + f2.plot(Path(tmpdir) / 'from_yaml.jpg') + assert filecmp.cmp(Path(tmpdir) / 'from_python.jpg', + Path(tmpdir) / 'from_yaml.jpg') + + with f2: + f2.index_ndarray(np.random.random([5, 5]), output_fn=print) + + assert f1 == f2 + + +def test_load_dump_load(tmpdir): + f1 = Flow.load_config('yaml/flow-legacy-syntax.yml') + f1.save_config(Path(tmpdir) / 'a0.yml') + f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') + f2.save_config(Path(tmpdir) / 'a1.yml') diff --git a/tests/unit/flow/yaml/flow-legacy-syntax.yml b/tests/unit/flow/yaml/flow-legacy-syntax.yml new file mode 100644 index 0000000000000..99e4be041de20 --- /dev/null +++ b/tests/unit/flow/yaml/flow-legacy-syntax.yml @@ -0,0 +1,12 @@ +!Flow +pods: + pod0: + uses: _pass + needs: gateway + pod1: + uses: _pass + needs: gateway + pod2: + uses: _merge + needs: [pod1, pod0] + diff --git a/tests/unit/flow/yaml/flow-v1-syntax.yml b/tests/unit/flow/yaml/flow-v1-syntax.yml new file mode 100644 index 0000000000000..83c7e4f123ce3 --- /dev/null +++ b/tests/unit/flow/yaml/flow-v1-syntax.yml @@ -0,0 +1,13 @@ +!Flow +version: '1' +pods: + - name: pod0 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - name: pod1 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - method: needs # let's try something new in Flow YAML v1: needs + needs: [pod1, pod0] \ No newline at end of file diff --git a/tests/unit/flow/yaml/flow-v1.0-syntax.yml b/tests/unit/flow/yaml/flow-v1.0-syntax.yml new file mode 100644 index 0000000000000..59929151eeef0 --- /dev/null +++ b/tests/unit/flow/yaml/flow-v1.0-syntax.yml @@ -0,0 +1,14 @@ +!Flow +version: '1.0' +pods: + - name: pod0 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - name: pod1 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - method: inspect # add an inspect node on pod1 + - method: needs # let's try something new in Flow YAML v1: needs + needs: [pod1, pod0] \ No newline at end of file diff --git a/tests/unit/flow/yaml/flow-v99-syntax.yml b/tests/unit/flow/yaml/flow-v99-syntax.yml new file mode 100644 index 0000000000000..42d7bc199b073 --- /dev/null +++ b/tests/unit/flow/yaml/flow-v99-syntax.yml @@ -0,0 +1,13 @@ +!Flow +version: '99.99' +pods: + - name: pod0 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - name: pod1 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - method: needs # let's try something new in Flow YAML v1: needs + needs: [pod1, pod0] \ No newline at end of file From f9bcbb2809ab8acd26be86f9a5e97d3d30d996b4 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 16:33:42 +0100 Subject: [PATCH 02/11] refactor(types): move append and extend into QueryLangSet --- tests/unit/flow/test_flow_yaml_parser.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py index 81dcbb59d6bc1..4e3e8695e33a3 100644 --- a/tests/unit/flow/test_flow_yaml_parser.py +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -42,6 +42,7 @@ def test_add_needs_inspect(tmpdir): def test_load_dump_load(tmpdir): + """TODO: Dumping valid yaml is out of scope of PR#1442, to do in separate PR""" f1 = Flow.load_config('yaml/flow-legacy-syntax.yml') f1.save_config(Path(tmpdir) / 'a0.yml') f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') From 0c830399f724abd448a304c911bc115cf99be402 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:08:36 +0100 Subject: [PATCH 03/11] feat(flow): add needs, inspect, version to flow yaml --- jina/flow/__init__.py | 8 +-- jina/flow/parser/__init__.py | 63 ------------------ jina/flow/parser/dumper.py | 84 ------------------------ jina/flow/parser/loader.py | 62 ----------------- jina/flow/yaml_parser/__init__.py | 41 ++++++++++++ jina/flow/yaml_parser/base.py | 21 ++++++ jina/flow/yaml_parser/legacy.py | 64 ++++++++++++++++++ jina/flow/yaml_parser/v1.py | 71 ++++++++++++++++++++ tests/unit/flow/test_flow_yaml_parser.py | 4 +- 9 files changed, 203 insertions(+), 215 deletions(-) delete mode 100644 jina/flow/parser/__init__.py delete mode 100644 jina/flow/parser/dumper.py delete mode 100644 jina/flow/parser/loader.py create mode 100644 jina/flow/yaml_parser/__init__.py create mode 100644 jina/flow/yaml_parser/base.py create mode 100644 jina/flow/yaml_parser/legacy.py create mode 100644 jina/flow/yaml_parser/v1.py diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index c40d09bacd7f2..308d87a160cc5 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -87,8 +87,8 @@ def to_yaml(cls, representer, data): @staticmethod def _dump_instance_to_yaml(data): # note: we only save non-default property for the sake of clarity - from .parser import dump - return dump(data) + from .yaml_parser import get_parser + return get_parser(version=data._version).dump(data), data @classmethod def from_yaml(cls, constructor, node): @@ -143,8 +143,8 @@ def _get_instance_from_yaml(cls, constructor, node): data = ruamel.yaml.constructor.SafeConstructor.construct_mapping( constructor, node, deep=True) - from .parser import parse - return parse(data), data + from .yaml_parser import get_parser + return get_parser(version=data.get('version', None)).parse(data), data @staticmethod def _parse_endpoints(op_flow, pod_name, endpoint, connect_to_last_pod=False) -> Set: diff --git a/jina/flow/parser/__init__.py b/jina/flow/parser/__init__.py deleted file mode 100644 index 9e16ee21a482b..0000000000000 --- a/jina/flow/parser/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -import warnings -from typing import Dict, List - -from ...excepts import BadFlowYAMLVersion - -if False: - from .. import Flow - -loader_prefix = 'load_v_' -dumper_prefix = 'dump_v_' - - -def parse(data: Dict) -> 'Flow': - """Return the Flow YAML parser given the syntax version number - - :param data: flow yaml file loaded as python dict - """ - from . import loader - - version = data.get('version', 'legacy') - v = version.replace('.', '_') - p = getattr(loader, loader_prefix + v, None) - if not p: - p = getattr(loader, loader_prefix + v.split('_')[0], None) - warnings.warn(f'can not find the parser for {version}, ' - f'will use the parser for version: "{v.split("_")[0]}"', UserWarning) - if not p: - raise BadFlowYAMLVersion(f'{version} is not a valid version number') - obj = p(data) - obj._version = version - obj.logger.success(f'successfully built a Flow from a YAML version: {version}') - return obj - - -def dump(data: 'Flow') -> Dict: - """Return the dictionary given a versioned flow object - - :param data: versioned flow object - """ - from . import dumper - version = data._version - v = version.replace('.', '_') - p = getattr(dumper, dumper_prefix + v, None) - if not p: - p = getattr(dumper, dumper_prefix + v.split('_')[0], None) - warnings.warn(f'can not find the dumper for {version}, ' - f'will use the dumper for version: "{v.split("_")[0]}"', UserWarning) - if not p: - raise BadFlowYAMLVersion(f'{version} is not a valid version number') - return p(data) - - -def get_support_versions() -> List[str]: - """List all supported versions - - :return: supported versions sorted alphabetically - """ - from . import loader - result = [] - for v in dir(loader): - if v.startswith(loader_prefix): - result.append(v.replace(loader_prefix, '').replace('_', '.')) - return list(sorted(result)) diff --git a/jina/flow/parser/dumper.py b/jina/flow/parser/dumper.py deleted file mode 100644 index 869d7a2317a24..0000000000000 --- a/jina/flow/parser/dumper.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -# loader function format - - def dump_v_MAJOR[_MINOR](data) - e.g. - - def dump_v_1_1(data) - - def dump_v_1(data) - -# match priority - if version is available: - - dump_v_MAJOR_MINOR - - dump_v_MAJOR - - throw BadFlowYAMLVersion - otherwise: - - dump_v_legacy -""" - -import argparse -from typing import Dict, Any - -from .. import Flow -from ...parser import set_pod_parser - - -def _get_taboo(): - """Get a set of keys that should not be dumped""" - return {k.dest for k in set_pod_parser()._actions if k.help == argparse.SUPPRESS} - - -def dump_v_legacy(data: 'Flow') -> Dict[str, Any]: - r = {} - if data._version: - r['version'] = data._version - - if data._kwargs: - r['with'] = data._kwargs - - if data._pod_nodes: - r['pods'] = {} - - if 'gateway' in data._pod_nodes: - # always dump gateway as the first pod, if exist - r['pods']['gateway'] = {} - - for k, v in data._pod_nodes.items(): - if k == 'gateway': - continue - - kwargs = {'needs': list(v.needs)} if v.needs else {} - kwargs.update(v._kwargs) - - if 'name' in kwargs: - kwargs.pop('name') - - r['pods'][k] = kwargs - return r - - -def dump_v_1(data: 'Flow') -> Dict[str, Any]: - r = {} - if data._version: - r['version'] = data._version - - if data._kwargs: - r['with'] = data._kwargs - - if data._pod_nodes: - r['pods'] = [] - - last_name = 'gateway' - for k, v in data._pod_nodes.items(): - if k == 'gateway': - continue - kwargs = {} - # only add "needs" when the value is not the last pod name - if list(v.needs) != [last_name]: - kwargs = {'needs': list(v.needs)} - kwargs.update(v._kwargs) - for t in _get_taboo(): - if t in kwargs: - kwargs.pop(t) - last_name = kwargs['name'] - r['pods'].append(kwargs) - return r diff --git a/jina/flow/parser/loader.py b/jina/flow/parser/loader.py deleted file mode 100644 index 061e1acdeb273..0000000000000 --- a/jina/flow/parser/loader.py +++ /dev/null @@ -1,62 +0,0 @@ -""" -# loader function format - - def load_v_MAJOR[_MINOR](data) - e.g. - - def load_v_1_1(data) - - def load_v_1(data) - -# match priority - if version is available: - - load_v_MAJOR_MINOR - - load_v_MAJOR - - throw BadFlowYAMLVersion - otherwise: - - load_v_legacy -""" - -from typing import Dict, Any - -from .. import Flow -from ...helper import expand_env_var - - -def load_v_legacy(data): - p = data.get('with', {}) # type: Dict[str, Any] - a = p.pop('args') if 'args' in p else () - k = p.pop('kwargs') if 'kwargs' in p else {} - # maybe there are some hanging kwargs in "parameters" - tmp_a = (expand_env_var(v) for v in a) - tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} - obj = Flow(*tmp_a, **tmp_p) - - pp = data.get('pods', {}) - for pod_name, pod_attr in pp.items(): - p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()} - if pod_name != 'gateway': - # ignore gateway when reading, it will be added during build() - obj.add(name=pod_name, **p_pod_attr, copy_flow=False) - # if node.tag in {'!CompoundExecutor'}: - # os.environ['JINA_WARN_UNNAMED'] = 'YES' - return obj - - -def load_v_1(data): - p = data.get('with', {}) # type: Dict[str, Any] - a = p.pop('args') if 'args' in p else () - k = p.pop('kwargs') if 'kwargs' in p else {} - # maybe there are some hanging kwargs in "parameters" - tmp_a = (expand_env_var(v) for v in a) - tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} - obj = Flow(*tmp_a, **tmp_p) - - pp = data.get('pods', []) - for pods in pp: - p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pods.items()} - # in v1 YAML, flow is an optional argument - if p_pod_attr.get('name', None) != 'gateway': - # ignore gateway when reading, it will be added during build() - method = p_pod_attr.get('method', 'add') - # support methods: add, needs, inspect - getattr(obj, method)(**p_pod_attr, copy_flow=False) - return obj diff --git a/jina/flow/yaml_parser/__init__.py b/jina/flow/yaml_parser/__init__.py new file mode 100644 index 0000000000000..94a8ff8e87ee5 --- /dev/null +++ b/jina/flow/yaml_parser/__init__.py @@ -0,0 +1,41 @@ +from typing import List, Optional, Type + +from .base import VersionedYamlParser +from ...excepts import BadFlowYAMLVersion + + +def _get_all_parser(): + from .legacy import LegacyParser + from .v1 import V1Parser + return [V1Parser, LegacyParser] + + +def get_parser(version: Optional[str]) -> 'VersionedYamlParser': + """ Get parser given the YAML version + + :param version: yaml version number in "MAJOR[.MINOR]" format + :return: + """ + all_parsers = _get_all_parser() + if version: + for p in all_parsers: + if p.version == version: + return p() + for p in all_parsers: + # fallback to major + if version.startswith(p.version): + return p() + raise BadFlowYAMLVersion(f'{version} is not a valid version number') + else: + # fallback to legacy parser + from .legacy import LegacyParser + return LegacyParser() + + +def get_supported_versions() -> List[str]: + """List all supported versions + + :return: supported versions sorted alphabetically + """ + all_parsers = _get_all_parser() + return list(sorted(p.version for p in all_parsers)) diff --git a/jina/flow/yaml_parser/base.py b/jina/flow/yaml_parser/base.py new file mode 100644 index 0000000000000..93429f483b1d2 --- /dev/null +++ b/jina/flow/yaml_parser/base.py @@ -0,0 +1,21 @@ +from typing import Dict + +from .. import Flow + + +class VersionedYamlParser: + version = 'legacy' # the version number this parser designed for + + def parse(self, data: Dict) -> 'Flow': + """Return the Flow YAML parser given the syntax version number + + :param data: flow yaml file loaded as python dict + """ + raise NotImplementedError + + def dump(self, data: 'Flow') -> Dict: + """Return the dictionary given a versioned flow object + + :param data: versioned flow object + """ + raise NotImplementedError diff --git a/jina/flow/yaml_parser/legacy.py b/jina/flow/yaml_parser/legacy.py new file mode 100644 index 0000000000000..7030744592565 --- /dev/null +++ b/jina/flow/yaml_parser/legacy.py @@ -0,0 +1,64 @@ +from typing import Dict, Any + +from .base import VersionedYamlParser +from .. import Flow +from ...helper import expand_env_var + + +class LegacyParser(VersionedYamlParser): + version = 'legacy' # the version number this parser designed for + + def parse(self, data: Dict) -> 'Flow': + """Return the Flow YAML parser given the syntax version number + + :param data: flow yaml file loaded as python dict + """ + p = data.get('with', {}) # type: Dict[str, Any] + a = p.pop('args') if 'args' in p else () + k = p.pop('kwargs') if 'kwargs' in p else {} + # maybe there are some hanging kwargs in "parameters" + tmp_a = (expand_env_var(v) for v in a) + tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} + obj = Flow(*tmp_a, **tmp_p) + + pp = data.get('pods', {}) + for pod_name, pod_attr in pp.items(): + p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()} + if pod_name != 'gateway': + # ignore gateway when reading, it will be added during build() + obj.add(name=pod_name, **p_pod_attr, copy_flow=False) + # if node.tag in {'!CompoundExecutor'}: + # os.environ['JINA_WARN_UNNAMED'] = 'YES' + return obj + + def dump(self, data: 'Flow') -> Dict: + """Return the dictionary given a versioned flow object + + :param data: versioned flow object + """ + r = {} + if data._version: + r['version'] = data._version + + if data._kwargs: + r['with'] = data._kwargs + + if data._pod_nodes: + r['pods'] = {} + + if 'gateway' in data._pod_nodes: + # always dump gateway as the first pod, if exist + r['pods']['gateway'] = {} + + for k, v in data._pod_nodes.items(): + if k == 'gateway': + continue + + kwargs = {'needs': list(v.needs)} if v.needs else {} + kwargs.update(v._kwargs) + + if 'name' in kwargs: + kwargs.pop('name') + + r['pods'][k] = kwargs + return r diff --git a/jina/flow/yaml_parser/v1.py b/jina/flow/yaml_parser/v1.py new file mode 100644 index 0000000000000..e0a61717a9c79 --- /dev/null +++ b/jina/flow/yaml_parser/v1.py @@ -0,0 +1,71 @@ +import argparse +from typing import Dict, Any + +from .base import VersionedYamlParser +from .. import Flow +from ...helper import expand_env_var +from ...parser import set_pod_parser + + +def _get_taboo(): + """Get a set of keys that should not be dumped""" + return {k.dest for k in set_pod_parser()._actions if k.help == argparse.SUPPRESS} + + +class V1Parser(VersionedYamlParser): + version = '1' # the version number this parser designed for + + def parse(self, data: Dict) -> 'Flow': + """Return the Flow YAML parser given the syntax version number + + :param data: flow yaml file loaded as python dict + """ + p = data.get('with', {}) # type: Dict[str, Any] + a = p.pop('args') if 'args' in p else () + k = p.pop('kwargs') if 'kwargs' in p else {} + # maybe there are some hanging kwargs in "parameters" + tmp_a = (expand_env_var(v) for v in a) + tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()} + obj = Flow(*tmp_a, **tmp_p) + + pp = data.get('pods', []) + for pods in pp: + p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pods.items()} + # in v1 YAML, flow is an optional argument + if p_pod_attr.get('name', None) != 'gateway': + # ignore gateway when reading, it will be added during build() + method = p_pod_attr.get('method', 'add') + # support methods: add, needs, inspect + getattr(obj, method)(**p_pod_attr, copy_flow=False) + return obj + + def dump(self, data: 'Flow') -> Dict: + """Return the dictionary given a versioned flow object + + :param data: versioned flow object + """ + r = {} + if data._version: + r['version'] = data._version + + if data._kwargs: + r['with'] = data._kwargs + + if data._pod_nodes: + r['pods'] = [] + + last_name = 'gateway' + for k, v in data._pod_nodes.items(): + if k == 'gateway': + continue + kwargs = {} + # only add "needs" when the value is not the last pod name + if list(v.needs) != [last_name]: + kwargs = {'needs': list(v.needs)} + kwargs.update(v._kwargs) + for t in _get_taboo(): + if t in kwargs: + kwargs.pop(t) + last_name = kwargs['name'] + r['pods'].append(kwargs) + return r diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py index 4e3e8695e33a3..0bafa91fe0ca7 100644 --- a/tests/unit/flow/test_flow_yaml_parser.py +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -6,11 +6,11 @@ from jina import Flow from jina.excepts import BadFlowYAMLVersion -from jina.flow.parser import get_support_versions +from jina.flow.yaml_parser import get_supported_versions def test_support_versions(): - assert get_support_versions() == ['1', 'legacy'] + assert get_supported_versions() == ['1', 'legacy'] def test_load_legacy_and_v1(): From ff5adba45274edc68e7082a405dfae44f57d7866 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:14:50 +0100 Subject: [PATCH 04/11] feat(flow): add needs, inspect, version to flow yaml --- jina/flow/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index 308d87a160cc5..7ae1851c6f992 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -55,7 +55,7 @@ def __init__(self, args: Optional['argparse.Namespace'] = None, **kwargs): """ super().__init__() - self._version = '' # YAML version number + self._version = '1' # YAML version number, this will be later overrided if YAML config says the other way self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod'] self._inspect_pods = {} # type: Dict[str, str] self._build_level = FlowBuildLevel.EMPTY @@ -88,7 +88,7 @@ def to_yaml(cls, representer, data): def _dump_instance_to_yaml(data): # note: we only save non-default property for the sake of clarity from .yaml_parser import get_parser - return get_parser(version=data._version).dump(data), data + return get_parser(version=data._version).dump(data) @classmethod def from_yaml(cls, constructor, node): From 2616d28741d45379b36e332f5895b5cd21fc4193 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:16:58 +0100 Subject: [PATCH 05/11] feat(flow): add needs, inspect, version to flow yaml --- tests/unit/flow/test_flow_yaml_parser.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py index 0bafa91fe0ca7..59620ce266794 100644 --- a/tests/unit/flow/test_flow_yaml_parser.py +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -26,12 +26,12 @@ def test_load_legacy_and_v1(): def test_add_needs_inspect(tmpdir): f1 = (Flow().add(name='pod0', needs='gateway').add(name='pod1', needs='gateway').inspect().needs(['pod0', 'pod1'])) - f1.plot(Path(tmpdir) / 'from_python.jpg') + f1.plot(str(Path(tmpdir) / 'from_python.jpg')) with f1: f1.index_ndarray(np.random.random([5, 5]), output_fn=print) f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') - f2.plot(Path(tmpdir) / 'from_yaml.jpg') + f2.plot(str(Path(tmpdir) / 'from_yaml.jpg')) assert filecmp.cmp(Path(tmpdir) / 'from_python.jpg', Path(tmpdir) / 'from_yaml.jpg') @@ -44,6 +44,6 @@ def test_add_needs_inspect(tmpdir): def test_load_dump_load(tmpdir): """TODO: Dumping valid yaml is out of scope of PR#1442, to do in separate PR""" f1 = Flow.load_config('yaml/flow-legacy-syntax.yml') - f1.save_config(Path(tmpdir) / 'a0.yml') + f1.save_config(str(Path(tmpdir) / 'a0.yml')) f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') - f2.save_config(Path(tmpdir) / 'a1.yml') + f2.save_config(str(Path(tmpdir) / 'a1.yml')) From d7c1b53faf31322dcdf54f3d872fcb6e19bf7f68 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:21:04 +0100 Subject: [PATCH 06/11] feat(flow): add needs, inspect, version to flow yaml --- tests/unit/flow/test_flow_yaml_parser.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py index 59620ce266794..0ebea76937fea 100644 --- a/tests/unit/flow/test_flow_yaml_parser.py +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -26,14 +26,10 @@ def test_load_legacy_and_v1(): def test_add_needs_inspect(tmpdir): f1 = (Flow().add(name='pod0', needs='gateway').add(name='pod1', needs='gateway').inspect().needs(['pod0', 'pod1'])) - f1.plot(str(Path(tmpdir) / 'from_python.jpg')) with f1: f1.index_ndarray(np.random.random([5, 5]), output_fn=print) f2 = Flow.load_config('yaml/flow-v1.0-syntax.yml') - f2.plot(str(Path(tmpdir) / 'from_yaml.jpg')) - assert filecmp.cmp(Path(tmpdir) / 'from_python.jpg', - Path(tmpdir) / 'from_yaml.jpg') with f2: f2.index_ndarray(np.random.random([5, 5]), output_fn=print) From 2169eebc27ce517fc4caeaa3e27db384ffd5ee7e Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:26:07 +0100 Subject: [PATCH 07/11] feat(flow): add needs, inspect, version to flow yaml --- jina/flow/yaml_parser/__init__.py | 4 ++-- jina/flow/yaml_parser/base.py | 10 +++++++++- tests/unit/flow/test_flow_yaml_parser.py | 1 - 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/jina/flow/yaml_parser/__init__.py b/jina/flow/yaml_parser/__init__.py index 94a8ff8e87ee5..46ec49e50efcd 100644 --- a/jina/flow/yaml_parser/__init__.py +++ b/jina/flow/yaml_parser/__init__.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Type +from typing import List, Optional from .base import VersionedYamlParser from ...excepts import BadFlowYAMLVersion @@ -23,7 +23,7 @@ def get_parser(version: Optional[str]) -> 'VersionedYamlParser': return p() for p in all_parsers: # fallback to major - if version.startswith(p.version): + if version.split('.')[0] == p.version: return p() raise BadFlowYAMLVersion(f'{version} is not a valid version number') else: diff --git a/jina/flow/yaml_parser/base.py b/jina/flow/yaml_parser/base.py index 93429f483b1d2..10c84d03fc90c 100644 --- a/jina/flow/yaml_parser/base.py +++ b/jina/flow/yaml_parser/base.py @@ -4,7 +4,15 @@ class VersionedYamlParser: - version = 'legacy' # the version number this parser designed for + """ Flow YAML parser for specific version + + Every :class:`VersionedYamlParser` must implement two methods and one class attribute: + - :meth:`parse`: to load data (in :class:`dict`) into a :class:`Flow` object + - :meth:`dump`: to dump a :class:`Flow` object into a :class:`dict` + - :attr:`version`: version number in :class:`str` in format ``MAJOR.[MINOR]`` + """ + + version = 'legacy' #: the version number this parser designed for def parse(self, data: Dict) -> 'Flow': """Return the Flow YAML parser given the syntax version number diff --git a/tests/unit/flow/test_flow_yaml_parser.py b/tests/unit/flow/test_flow_yaml_parser.py index 0ebea76937fea..5ce55ca2bcc5b 100644 --- a/tests/unit/flow/test_flow_yaml_parser.py +++ b/tests/unit/flow/test_flow_yaml_parser.py @@ -1,4 +1,3 @@ -import filecmp from pathlib import Path import numpy as np From 138fc4492c8496f999a391cdaf02a34c11909240 Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:35:03 +0100 Subject: [PATCH 08/11] refactor(types): move append and extend into QueryLangSet --- jina/flow/yaml_parser/__init__.py | 6 ++++++ jina/flow/yaml_parser/legacy.py | 3 +-- jina/flow/yaml_parser/v1.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/jina/flow/yaml_parser/__init__.py b/jina/flow/yaml_parser/__init__.py index 46ec49e50efcd..2f3a4d236073f 100644 --- a/jina/flow/yaml_parser/__init__.py +++ b/jina/flow/yaml_parser/__init__.py @@ -1,3 +1,4 @@ +import warnings from typing import List, Optional from .base import VersionedYamlParser @@ -24,10 +25,15 @@ def get_parser(version: Optional[str]) -> 'VersionedYamlParser': for p in all_parsers: # fallback to major if version.split('.')[0] == p.version: + warnings.warn(f'can not find parser for version: {version}, ' + f'fallback to parser for version: {p.version}', UserWarning) return p() raise BadFlowYAMLVersion(f'{version} is not a valid version number') else: # fallback to legacy parser + warnings.warn(f'can not find parser for version: {version}, ' + f'fallback to legacy parser. ' + f'this usually mean you are using a depreciated YAML format.', DeprecationWarning) from .legacy import LegacyParser return LegacyParser() diff --git a/jina/flow/yaml_parser/legacy.py b/jina/flow/yaml_parser/legacy.py index 7030744592565..591e1f8f7fb45 100644 --- a/jina/flow/yaml_parser/legacy.py +++ b/jina/flow/yaml_parser/legacy.py @@ -27,8 +27,7 @@ def parse(self, data: Dict) -> 'Flow': if pod_name != 'gateway': # ignore gateway when reading, it will be added during build() obj.add(name=pod_name, **p_pod_attr, copy_flow=False) - # if node.tag in {'!CompoundExecutor'}: - # os.environ['JINA_WARN_UNNAMED'] = 'YES' + return obj def dump(self, data: 'Flow') -> Dict: diff --git a/jina/flow/yaml_parser/v1.py b/jina/flow/yaml_parser/v1.py index e0a61717a9c79..e336a8dbc77b5 100644 --- a/jina/flow/yaml_parser/v1.py +++ b/jina/flow/yaml_parser/v1.py @@ -13,6 +13,37 @@ def _get_taboo(): class V1Parser(VersionedYamlParser): + """V1Parser introduces new syntax and features: + + - It has a top-level field ``version`` + - ``pods`` is now a List of Dict (rather than a Dict as prev.) + - ``name`` is now optional + - new field ``method`` can be used to specify how to add this Pod into the Flow, availables are: + - ``add``: (default) equal to `Flow.add(...)` + - ``needs``: (default) equal to `Flow.needs(...)` + - ``inspect``: (default) equal to `Flow.inspect(...)` + + An example V1 YAML config can be found below: + .. highlight:: yaml + .. code-block:: yaml + + !Flow + version: '1.0' + pods: + - name: pod0 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - name: pod1 # notice the change here, name is now an attribute + method: add # by default method is always add, available: add, needs, inspect + uses: _pass + needs: gateway + - method: inspect # add an inspect node on pod1 + - method: needs # let's try something new in Flow YAML v1: needs + needs: [pod1, pod0] + + + """ version = '1' # the version number this parser designed for def parse(self, data: Dict) -> 'Flow': From 3b471573348a719cd5834e8bb23d8ddc0133493f Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 17:35:37 +0100 Subject: [PATCH 09/11] refactor(types): move append and extend into QueryLangSet --- jina/flow/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index 7ae1851c6f992..5631dc300c6ff 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -55,7 +55,7 @@ def __init__(self, args: Optional['argparse.Namespace'] = None, **kwargs): """ super().__init__() - self._version = '1' # YAML version number, this will be later overrided if YAML config says the other way + self._version = '1' #: YAML version number, this will be later overridden if YAML config says the other way self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod'] self._inspect_pods = {} # type: Dict[str, str] self._build_level = FlowBuildLevel.EMPTY From 95e39c5820079018fd6635f9b24b870e6184ea4b Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 19:01:19 +0100 Subject: [PATCH 10/11] feat(flow): add needs, inspect, version to flow yaml --- jina/peapods/pods/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/jina/peapods/pods/__init__.py b/jina/peapods/pods/__init__.py index 4692e60e11f45..ca939f30ebeea 100644 --- a/jina/peapods/pods/__init__.py +++ b/jina/peapods/pods/__init__.py @@ -117,7 +117,6 @@ def _parse_args(self, args: Namespace) -> Dict[str, Optional[Union[List[Namespac peas_args['tail'] = _copy_to_tail_args(args) peas_args['peas'] = _set_peas_args(args, peas_args.get('head', None), peas_args.get('tail', None)) else: - _set_after_to_pass(args) self.is_head_router = False self.is_tail_router = False peas_args['peas'] = [args] From 02775c78aaadb0222a9382ea6a7eacb7070e402a Mon Sep 17 00:00:00 2001 From: Han Xiao Date: Fri, 11 Dec 2020 19:10:23 +0100 Subject: [PATCH 11/11] refactor(types): move append and extend into QueryLangSet --- jina/flow/yaml_parser/__init__.py | 2 ++ jina/resources/helloworld.flow.index.yml | 5 +++-- jina/resources/helloworld.flow.query.yml | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/jina/flow/yaml_parser/__init__.py b/jina/flow/yaml_parser/__init__.py index 2f3a4d236073f..ab7d27a7dd102 100644 --- a/jina/flow/yaml_parser/__init__.py +++ b/jina/flow/yaml_parser/__init__.py @@ -19,6 +19,8 @@ def get_parser(version: Optional[str]) -> 'VersionedYamlParser': """ all_parsers = _get_all_parser() if version: + if isinstance(version, float) or isinstance(version, int): + version = str(version) for p in all_parsers: if p.version == version: return p() diff --git a/jina/resources/helloworld.flow.index.yml b/jina/resources/helloworld.flow.index.yml index 14ad0c90581fd..cb7b953d8a3f6 100644 --- a/jina/resources/helloworld.flow.index.yml +++ b/jina/resources/helloworld.flow.index.yml @@ -1,12 +1,13 @@ !Flow +version: '1.0' with: logserver: $WITH_LOGSERVER compress_hwm: 1024 pods: - encode: + - name: encode uses: $RESOURCE_DIR/helloworld.encoder.yml parallel: $PARALLEL - index: + - name: index uses: $RESOURCE_DIR/helloworld.indexer.yml shards: $SHARDS separated_workspace: true diff --git a/jina/resources/helloworld.flow.query.yml b/jina/resources/helloworld.flow.query.yml index fbceef1332829..303e53d9134e8 100644 --- a/jina/resources/helloworld.flow.query.yml +++ b/jina/resources/helloworld.flow.query.yml @@ -1,13 +1,14 @@ !Flow +version: '1.0' with: read_only: true # better add this in the query time logserver: $WITH_LOGSERVER compress_hwm: 1024 pods: - encode: + - name: encode uses: $RESOURCE_DIR/helloworld.encoder.yml parallel: $PARALLEL - index: + - name: index uses: $RESOURCE_DIR/helloworld.indexer.yml shards: $SHARDS separated_workspace: true