Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): add needs, inspect, version to flow yaml #1442

Merged
merged 11 commits into from Dec 11, 2020
4 changes: 4 additions & 0 deletions jina/excepts.py
Expand Up @@ -177,3 +177,7 @@ class LengthMismatchException(Exception):

class ImageAlreadyExists(Exception):
""" Exception when an image with the name, module version, and Jina version already exists on the Hub"""


class BadFlowYAMLVersion(Exception):
""" Exception when Flow YAML config specifies a wrong version number"""
50 changes: 5 additions & 45 deletions jina/flow/__init__.py
Expand Up @@ -55,6 +55,7 @@ def __init__(self, args: Optional['argparse.Namespace'] = None, **kwargs):

"""
super().__init__()
self._version = '1' #: YAML version number, this will be later overridden if YAML config says the other way
self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod']
self._inspect_pods = {} # type: Dict[str, str]
self._build_level = FlowBuildLevel.EMPTY
Expand Down Expand Up @@ -86,30 +87,8 @@ def to_yaml(cls, representer, data):
@staticmethod
def _dump_instance_to_yaml(data):
# note: we only save non-default property for the sake of clarity
r = {}

if data._kwargs:
r['with'] = data._kwargs

if data._pod_nodes:
r['pods'] = {}

if 'gateway' in data._pod_nodes:
# always dump gateway as the first pod, if exist
r['pods']['gateway'] = {}

for k, v in data._pod_nodes.items():
if k == 'gateway':
continue

kwargs = {'needs': list(v.needs)} if v.needs else {}
kwargs.update(v._kwargs)

if 'name' in kwargs:
kwargs.pop('name')

r['pods'][k] = kwargs
return r
from .yaml_parser import get_parser
return get_parser(version=data._version).dump(data)

@classmethod
def from_yaml(cls, constructor, node):
Expand Down Expand Up @@ -164,27 +143,8 @@ def _get_instance_from_yaml(cls, constructor, node):
data = ruamel.yaml.constructor.SafeConstructor.construct_mapping(
constructor, node, deep=True)

p = data.get('with', {}) # type: Dict[str, Any]
a = p.pop('args') if 'args' in p else ()
k = p.pop('kwargs') if 'kwargs' in p else {}
# maybe there are some hanging kwargs in "parameters"
tmp_a = (expand_env_var(v) for v in a)
tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()}
obj = cls(*tmp_a, **tmp_p)

pp = data.get('pods', {})
for pod_name, pod_attr in pp.items():
p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()}
if pod_name != 'gateway':
# ignore gateway when reading, it will be added during build()
obj.add(name=pod_name, **p_pod_attr, copy_flow=False)

obj.logger.success(f'successfully built {cls.__name__} from a yaml config')

# if node.tag in {'!CompoundExecutor'}:
# os.environ['JINA_WARN_UNNAMED'] = 'YES'

return obj, data
from .yaml_parser import get_parser
return get_parser(version=data.get('version', None)).parse(data), data

@staticmethod
def _parse_endpoints(op_flow, pod_name, endpoint, connect_to_last_pod=False) -> Set:
Expand Down
49 changes: 49 additions & 0 deletions jina/flow/yaml_parser/__init__.py
@@ -0,0 +1,49 @@
import warnings
from typing import List, Optional

from .base import VersionedYamlParser
from ...excepts import BadFlowYAMLVersion


def _get_all_parser():
from .legacy import LegacyParser
from .v1 import V1Parser
return [V1Parser, LegacyParser]


def get_parser(version: Optional[str]) -> 'VersionedYamlParser':
""" Get parser given the YAML version

:param version: yaml version number in "MAJOR[.MINOR]" format
:return:
"""
all_parsers = _get_all_parser()
if version:
if isinstance(version, float) or isinstance(version, int):
version = str(version)
for p in all_parsers:
if p.version == version:
return p()
for p in all_parsers:
# fallback to major
if version.split('.')[0] == p.version:
warnings.warn(f'can not find parser for version: {version}, '
f'fallback to parser for version: {p.version}', UserWarning)
return p()
raise BadFlowYAMLVersion(f'{version} is not a valid version number')
else:
# fallback to legacy parser
warnings.warn(f'can not find parser for version: {version}, '
f'fallback to legacy parser. '
f'this usually mean you are using a depreciated YAML format.', DeprecationWarning)
from .legacy import LegacyParser
hanxiao marked this conversation as resolved.
Show resolved Hide resolved
return LegacyParser()


def get_supported_versions() -> List[str]:
"""List all supported versions

:return: supported versions sorted alphabetically
"""
all_parsers = _get_all_parser()
return list(sorted(p.version for p in all_parsers))
29 changes: 29 additions & 0 deletions jina/flow/yaml_parser/base.py
@@ -0,0 +1,29 @@
from typing import Dict

from .. import Flow


class VersionedYamlParser:
""" Flow YAML parser for specific version

Every :class:`VersionedYamlParser` must implement two methods and one class attribute:
- :meth:`parse`: to load data (in :class:`dict`) into a :class:`Flow` object
- :meth:`dump`: to dump a :class:`Flow` object into a :class:`dict`
- :attr:`version`: version number in :class:`str` in format ``MAJOR.[MINOR]``
"""

version = 'legacy' #: the version number this parser designed for

def parse(self, data: Dict) -> 'Flow':
"""Return the Flow YAML parser given the syntax version number

:param data: flow yaml file loaded as python dict
"""
raise NotImplementedError

def dump(self, data: 'Flow') -> Dict:
"""Return the dictionary given a versioned flow object

:param data: versioned flow object
"""
raise NotImplementedError
63 changes: 63 additions & 0 deletions jina/flow/yaml_parser/legacy.py
@@ -0,0 +1,63 @@
from typing import Dict, Any

from .base import VersionedYamlParser
from .. import Flow
from ...helper import expand_env_var


class LegacyParser(VersionedYamlParser):
version = 'legacy' # the version number this parser designed for

def parse(self, data: Dict) -> 'Flow':
"""Return the Flow YAML parser given the syntax version number

:param data: flow yaml file loaded as python dict
"""
p = data.get('with', {}) # type: Dict[str, Any]
a = p.pop('args') if 'args' in p else ()
k = p.pop('kwargs') if 'kwargs' in p else {}
# maybe there are some hanging kwargs in "parameters"
tmp_a = (expand_env_var(v) for v in a)
tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()}
obj = Flow(*tmp_a, **tmp_p)

pp = data.get('pods', {})
for pod_name, pod_attr in pp.items():
p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pod_attr.items()}
if pod_name != 'gateway':
# ignore gateway when reading, it will be added during build()
obj.add(name=pod_name, **p_pod_attr, copy_flow=False)

return obj

def dump(self, data: 'Flow') -> Dict:
"""Return the dictionary given a versioned flow object

:param data: versioned flow object
"""
r = {}
if data._version:
r['version'] = data._version

if data._kwargs:
r['with'] = data._kwargs

if data._pod_nodes:
r['pods'] = {}

if 'gateway' in data._pod_nodes:
# always dump gateway as the first pod, if exist
r['pods']['gateway'] = {}

for k, v in data._pod_nodes.items():
if k == 'gateway':
continue

kwargs = {'needs': list(v.needs)} if v.needs else {}
kwargs.update(v._kwargs)

if 'name' in kwargs:
kwargs.pop('name')

r['pods'][k] = kwargs
return r
102 changes: 102 additions & 0 deletions jina/flow/yaml_parser/v1.py
@@ -0,0 +1,102 @@
import argparse
from typing import Dict, Any

from .base import VersionedYamlParser
from .. import Flow
from ...helper import expand_env_var
from ...parser import set_pod_parser


def _get_taboo():
"""Get a set of keys that should not be dumped"""
return {k.dest for k in set_pod_parser()._actions if k.help == argparse.SUPPRESS}


class V1Parser(VersionedYamlParser):
"""V1Parser introduces new syntax and features:

- It has a top-level field ``version``
- ``pods`` is now a List of Dict (rather than a Dict as prev.)
- ``name`` is now optional
- new field ``method`` can be used to specify how to add this Pod into the Flow, availables are:
- ``add``: (default) equal to `Flow.add(...)`
- ``needs``: (default) equal to `Flow.needs(...)`
- ``inspect``: (default) equal to `Flow.inspect(...)`

An example V1 YAML config can be found below:
.. highlight:: yaml
.. code-block:: yaml

!Flow
version: '1.0'
pods:
- name: pod0 # notice the change here, name is now an attribute
method: add # by default method is always add, available: add, needs, inspect
uses: _pass
needs: gateway
- name: pod1 # notice the change here, name is now an attribute
method: add # by default method is always add, available: add, needs, inspect
uses: _pass
needs: gateway
- method: inspect # add an inspect node on pod1
- method: needs # let's try something new in Flow YAML v1: needs
needs: [pod1, pod0]


"""
version = '1' # the version number this parser designed for

def parse(self, data: Dict) -> 'Flow':
"""Return the Flow YAML parser given the syntax version number

:param data: flow yaml file loaded as python dict
"""
p = data.get('with', {}) # type: Dict[str, Any]
a = p.pop('args') if 'args' in p else ()
k = p.pop('kwargs') if 'kwargs' in p else {}
# maybe there are some hanging kwargs in "parameters"
tmp_a = (expand_env_var(v) for v in a)
tmp_p = {kk: expand_env_var(vv) for kk, vv in {**k, **p}.items()}
obj = Flow(*tmp_a, **tmp_p)

pp = data.get('pods', [])
for pods in pp:
p_pod_attr = {kk: expand_env_var(vv) for kk, vv in pods.items()}
# in v1 YAML, flow is an optional argument
if p_pod_attr.get('name', None) != 'gateway':
# ignore gateway when reading, it will be added during build()
method = p_pod_attr.get('method', 'add')
# support methods: add, needs, inspect
getattr(obj, method)(**p_pod_attr, copy_flow=False)
return obj

def dump(self, data: 'Flow') -> Dict:
"""Return the dictionary given a versioned flow object

:param data: versioned flow object
"""
r = {}
if data._version:
r['version'] = data._version

if data._kwargs:
r['with'] = data._kwargs

if data._pod_nodes:
r['pods'] = []

last_name = 'gateway'
for k, v in data._pod_nodes.items():
if k == 'gateway':
continue
kwargs = {}
# only add "needs" when the value is not the last pod name
if list(v.needs) != [last_name]:
kwargs = {'needs': list(v.needs)}
kwargs.update(v._kwargs)
for t in _get_taboo():
if t in kwargs:
kwargs.pop(t)
last_name = kwargs['name']
r['pods'].append(kwargs)
return r
1 change: 0 additions & 1 deletion jina/peapods/pods/__init__.py
Expand Up @@ -117,7 +117,6 @@ def _parse_args(self, args: Namespace) -> Dict[str, Optional[Union[List[Namespac
peas_args['tail'] = _copy_to_tail_args(args)
peas_args['peas'] = _set_peas_args(args, peas_args.get('head', None), peas_args.get('tail', None))
else:
_set_after_to_pass(args)
self.is_head_router = False
self.is_tail_router = False
peas_args['peas'] = [args]
Expand Down
5 changes: 3 additions & 2 deletions jina/resources/helloworld.flow.index.yml
@@ -1,12 +1,13 @@
!Flow
version: '1.0'
with:
logserver: $WITH_LOGSERVER
compress_hwm: 1024
pods:
encode:
- name: encode
uses: $RESOURCE_DIR/helloworld.encoder.yml
parallel: $PARALLEL
index:
- name: index
uses: $RESOURCE_DIR/helloworld.indexer.yml
shards: $SHARDS
separated_workspace: true
5 changes: 3 additions & 2 deletions jina/resources/helloworld.flow.query.yml
@@ -1,13 +1,14 @@
!Flow
version: '1.0'
with:
read_only: true # better add this in the query time
logserver: $WITH_LOGSERVER
compress_hwm: 1024
pods:
encode:
- name: encode
uses: $RESOURCE_DIR/helloworld.encoder.yml
parallel: $PARALLEL
index:
- name: index
uses: $RESOURCE_DIR/helloworld.indexer.yml
shards: $SHARDS
separated_workspace: true
Expand Down
2 changes: 1 addition & 1 deletion jina/types/sets/document.py
Expand Up @@ -109,7 +109,7 @@ def all_embeddings(self) -> Tuple['np.ndarray', 'DocumentSet', 'DocumentSet']:
def all_contents(self) -> Tuple['np.ndarray', 'DocumentSet', 'DocumentSet']:
"""Return all embeddings from every document in this set as a ndarray

:return a tuple of embedding in :class:`np.ndarray`,
:return: a tuple of embedding in :class:`np.ndarray`,
the corresponding documents in a :class:`DocumentSet`,
and the documents have no contents in a :class:`DocumentSet`.
"""
Expand Down