Skip to content

Commit

Permalink
refactor: remove flow optimization options (#1975)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Feb 19, 2021
1 parent 0f2dc89 commit 97b6637
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 276 deletions.
21 changes: 0 additions & 21 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@
To use these enums in YAML config, following the example below:
.. highlight:: yaml
.. code-block:: yaml
!Flow
with:
optimize_level: !FlowOptimizeLevel IGNORE_GATEWAY
# or
optimize_level: IGNORE_GATEWAY
#or
optimize_level: ignore_gateway
no_gateway: true
.. highlight:: yaml
.. code-block:: yaml
Expand Down Expand Up @@ -127,14 +114,6 @@ def is_block(self) -> bool:
return self.value == 2


class FlowOptimizeLevel(BetterEnum):
"""The level of flow optimization."""

NONE = 0
IGNORE_GATEWAY = 1
FULL = 2


class LogVerbosity(BetterEnum):
"""Verbosity level of the logger."""

Expand Down
11 changes: 2 additions & 9 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from contextlib import ExitStack
from typing import Optional, Union, Tuple, List, Set, Dict, TextIO

from .builder import build_required, _build_flow, _optimize_flow, _hanging_pods
from .builder import build_required, _build_flow, _hanging_pods
from .. import __default_host__
from ..clients import Client, WebSocketClient
from ..enums import FlowBuildLevel, PodRoleType, FlowInspectType
Expand Down Expand Up @@ -50,11 +50,7 @@ class BaseFlow(JAMLCompatible, ExitStack, metaclass=FlowType):
.. highlight:: python
.. code-block:: python
f = Flow(optimize_level=FlowOptimizeLevel.NONE).add(uses='forward', parallel=3)
The optimized version, i.e. :code:`Flow(optimize_level=FlowOptimizeLevel.FULL)`
will generate 4 Peas, but it will force the :class:`GatewayPea` to take BIND role,
as the head and tail routers are removed.
f = Flow.add(uses='forward', parallel=3)
:param kwargs: other keyword arguments that will be shared by all Pods in this Flow
:param args: Namespace args
Expand Down Expand Up @@ -383,7 +379,6 @@ def build(self, copy_flow: bool = False) -> 'BaseFlow':

op_flow = copy.deepcopy(self) if copy_flow else self

_pod_edges = set()
if op_flow.args.inspect == FlowInspectType.COLLECT:
op_flow.gather_inspect(copy_flow=False)

Expand All @@ -410,10 +405,8 @@ def build(self, copy_flow: bool = False) -> 'BaseFlow':
if start not in op_flow._pod_nodes:
raise FlowMissingPodError(f'{start} is not in this flow, misspelled name?')
_outgoing_map[start].append(end)
_pod_edges.add((start, end))

op_flow = _build_flow(op_flow, _outgoing_map)
op_flow = _optimize_flow(op_flow, _outgoing_map, _pod_edges)
hanging_pods = _hanging_pods(op_flow)
if hanging_pods:
self.logger.warning(f'{hanging_pods} are hanging in this flow with no pod receiving from them, '
Expand Down
47 changes: 1 addition & 46 deletions jina/flow/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, List, Callable

from .. import __default_host__
from ..enums import SocketType, FlowOptimizeLevel, FlowBuildLevel, PodRoleType
from ..enums import SocketType, FlowBuildLevel, PodRoleType
from ..excepts import FlowBuildLevelError
from ..peapods.pods import _fill_in_host

Expand Down Expand Up @@ -138,48 +138,3 @@ def _connect(first: 'BasePod', second: 'BasePod', first_socket_type: 'SocketType
second.head_args.port_in = first.tail_args.port_out
else:
raise NotImplementedError(f'{first_socket_type!r} is not supported here')


def _optimize_flow(op_flow, outgoing_map: Dict[str, List[str]], pod_edges: {str, str}) -> 'Flow':
def _optimize_two_connections(flow: 'Flow', start_node_name: str, end_node_name: str):
""" THIS CODE IS NEVER TESTED AND THE LOGIC MAY NOT APPLIED ANYMORE
:param flow: the Flow we're optimizing
:param start_node_name: first node of connection
:param end_node_name: second node of connection
"""
start_node = flow._pod_nodes[start_node_name]
end_node = flow._pod_nodes[end_node_name]
edges_with_same_start = [ed for ed in pod_edges if ed[0].startswith(start_node_name)]
edges_with_same_end = [ed for ed in pod_edges if ed[1].endswith(end_node_name)]
if len(edges_with_same_start) > 1 or len(edges_with_same_end) > 1:
flow.logger.info(f'Connection between {start_node_name} and {end_node_name} cannot be optimized')
else:
if start_node.role == PodRoleType.GATEWAY:
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and end_node.is_head_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.optimize_connect_to_tail_of(start_node)
elif end_node.role == PodRoleType.GATEWAY:
# TODO: this part of the code is never executed given the current optimization level. Never tested.
if flow.args.optimize_level > FlowOptimizeLevel.IGNORE_GATEWAY and \
start_node.is_tail_router and start_node.tail_args.num_part <= 1:
# connect gateway directly to peas only if this is unblock router
# as gateway can not block & reduce message
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.optimize_connect_to_head_of(end_node)
else:
if end_node.is_head_router and not start_node.is_tail_router:
flow.logger.info(
f'Node {end_node_name} connects to tail of {start_node_name}')
end_node.optimize_connect_to_tail_of(start_node)
elif start_node.is_tail_router and start_node.tail_args.num_part <= 1:
flow.logger.info(
f'Node {start_node_name} connects to head of {end_node_name}')
start_node.optimize_connect_to_head_of(end_node)

if op_flow.args.optimize_level > FlowOptimizeLevel.NONE:
return _traverse_graph(op_flow, outgoing_map, _optimize_two_connections)
else:
return op_flow
9 changes: 1 addition & 8 deletions jina/parsers/flow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""Argparser module for Flow"""
import argparse

from .base import set_base_parser
from .helper import _SHOW_ALL_ARGS
from .peapods.base import mixin_base_ppr_parser
from ..enums import FlowOptimizeLevel, FlowInspectType
from ..enums import FlowInspectType


def set_flow_parser(parser=None):
Expand All @@ -27,8 +24,4 @@ def set_flow_parser(parser=None):
If `REMOVE` is given then all inspect pods are removed when building the flow.
''')

parser.add_argument('--optimize-level', type=FlowOptimizeLevel.from_string, default=FlowOptimizeLevel.NONE,
help='removing redundant routers from the flow. Note, this may change the gateway zmq socket to BIND \
and hence not allow multiple clients connected to the gateway at the same time.'
if _SHOW_ALL_ARGS else argparse.SUPPRESS)
return parser
42 changes: 0 additions & 42 deletions jina/peapods/pods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,48 +252,6 @@ def _set_conditional_args(args):
else:
args.runtime_cls = 'GRPCRuntime'

def optimize_connect_to_tail_of(self, incoming_pod: 'BasePod'):
"""Removes the `head` arguments to make sure that the Peas are connected directly to the
`tail` of the incoming pod.
:param incoming_pod: :class:`BasePod` that connects its tail to this :class:`BasePod` head
"""
if self.args.parallel > 1 and self.is_head_router:
# keep the port_in and socket_in of prev_args
# only reset its output
incoming_pod.tail_args = _copy_to_head_args(incoming_pod.tail_args, self.args.polling.is_push,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, incoming_pod.tail_args, self.tail_args)
# remove the head node
self.peas_args['head'] = None
# head is no longer a router anymore
self.is_head_router = False
self.deducted_head = incoming_pod.tail_args
else:
raise ValueError('the current pod has no head router, deducting the head is confusing')

def optimize_connect_to_head_of(self, outgoing_pod: 'BasePod'):
"""Removes the `tail` arguments to make sure that the Peas are connected directly to the
`head` of the outgoing pod.
:param outgoing_pod: :class:`BasePod` that this :class:`BasePod` tries to send data to
"""
if self.args.parallel > 1 and self.is_tail_router:
# keep the port_out and socket_out of next_arts
# only reset its input
outgoing_pod.head_args = _copy_to_tail_args(outgoing_pod.head_args,
as_router=False)
# update peas to receive from it
self.peas_args['peas'] = _set_peas_args(self.args, self.head_args, outgoing_pod.head_args)
# remove the tail node
self.peas_args['tail'] = None
# tail is no longer a router anymore
self.is_tail_router = False
self.deducted_tail = outgoing_pod.head_args
else:
raise ValueError('the current pod has no tail router, deducting the tail is confusing')

@property
def is_ready(self) -> bool:
"""Checks if Pod is read.
Expand Down
40 changes: 24 additions & 16 deletions tests/unit/flow/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from jina.helper import random_identity
from jina.proto.jina_pb2 import DocumentProto
from jina.types.request import Response
from tests import random_docs, rm_files
from tests import random_docs

cur_dir = os.path.dirname(os.path.abspath(__file__))


def test_flow_with_jump():
def test_flow_with_jump(tmpdir):
def _validate(f):
node = f._pod_nodes['gateway']
assert node.head_args.socket_in == SocketType.PULL_CONNECT
Expand Down Expand Up @@ -64,14 +64,12 @@ def _validate(f):
with f:
_validate(f)

f.save_config('tmp.yml')
Flow.load_config('tmp.yml')
f.save_config(os.path.join(str(tmpdir), 'tmp.yml'))
Flow.load_config(os.path.join(str(tmpdir), 'tmp.yml'))

with Flow.load_config('tmp.yml') as f:
with Flow.load_config(os.path.join(str(tmpdir), 'tmp.yml')) as f:
_validate(f)

rm_files(['tmp.yml'])


@pytest.mark.parametrize('restful', [False, True])
def test_simple_flow(restful):
Expand Down Expand Up @@ -108,7 +106,7 @@ def bytes_fn():
assert node.peas_args['peas'][0] == node.tail_args


def test_flow_identical():
def test_flow_identical(tmpdir):
with open(os.path.join(cur_dir, '../yaml/test-flow.yml')) as fp:
a = Flow.load_config(fp)

Expand All @@ -118,9 +116,9 @@ def test_flow_identical():
.add(name='encode2', parallel=2, needs='chunk_seg')
.join(['wqncode1', 'encode2']))

a.save_config('test2.yml')
a.save_config(os.path.join(str(tmpdir), 'test2.yml'))

c = Flow.load_config('test2.yml')
c = Flow.load_config(os.path.join(str(tmpdir), 'test2.yml'))

assert a == b
assert a == c
Expand Down Expand Up @@ -157,8 +155,6 @@ def test_flow_identical():
assert node.tail_args.socket_in == SocketType.PULL_BIND
assert node.tail_args.socket_out == SocketType.PUSH_CONNECT

rm_files(['test2.yml'])


@pytest.mark.parametrize('restful', [False, True])
def test_flow_no_container(restful):
Expand All @@ -169,13 +165,19 @@ def test_flow_no_container(restful):
f.index(input_fn=random_docs(10))


def test_shards():
@pytest.fixture
def docpb_workspace(tmpdir):
os.environ['TEST_DOCSHARD_WORKSPACE'] = str(tmpdir)
yield
del os.environ['TEST_DOCSHARD_WORKSPACE']


def test_shards(docpb_workspace):
f = Flow().add(name='doc_pb', uses=os.path.join(cur_dir, '../yaml/test-docpb.yml'), parallel=3)
with f:
f.index(input_fn=random_docs(1000), random_doc_id=False)
with f:
pass
rm_files(['test-docshard-tmp'])


def test_py_client():
Expand Down Expand Up @@ -368,8 +370,15 @@ def test_refactor_num_part_2(restful):
f.index(['abbcs', 'efgh'])


@pytest.fixture()
def datauri_workspace(tmpdir):
os.environ['TEST_DATAURIINDEX_WORKSPACE'] = str(tmpdir)
yield
del os.environ['TEST_DATAURIINDEX_WORKSPACE']


@pytest.mark.parametrize('restful', [False, True])
def test_index_text_files(mocker, restful):
def test_index_text_files(mocker, restful, datauri_workspace):
def validate(req):
assert len(req.docs) > 0
for d in req.docs:
Expand All @@ -383,7 +392,6 @@ def validate(req):
with f:
f.index_files('*.py', on_done=response_mock)

rm_files(['doc.gzip'])
response_mock.assert_called()


Expand Down
12 changes: 9 additions & 3 deletions tests/unit/flow/test_flow_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from jina.flow import Flow
from jina.proto import jina_pb2
from tests import random_docs, rm_files
from tests import random_docs

cur_dir = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -20,9 +20,16 @@ def random_queries(num_docs, chunks_per_doc=5):
yield d


@pytest.fixture
def docpb_workspace(tmpdir):
os.environ['TEST_DOCSHARD_WORKSPACE'] = str(tmpdir)
yield
del os.environ['TEST_DOCSHARD_WORKSPACE']


@pytest.mark.skipif('GITHUB_WORKFLOW' in os.environ, reason='skip the network test on github workflow')
@pytest.mark.parametrize('restful', [False, True])
def test_shards_insufficient_data(mocker, restful):
def test_shards_insufficient_data(mocker, restful, docpb_workspace):
"""THIS IS SUPER IMPORTANT FOR TESTING SHARDS
IF THIS FAILED, DONT IGNORE IT, DEBUG IT
Expand Down Expand Up @@ -63,5 +70,4 @@ def validate(req):

on_done=validate)
time.sleep(2)
rm_files(['test-docshard-tmp'])
mock.assert_called_once()
20 changes: 8 additions & 12 deletions tests/unit/flow/test_flow_yaml_parser.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from pathlib import Path
import os

from pathlib import Path
import numpy as np
import pytest

from jina import Flow, AsyncFlow
from jina.enums import FlowOptimizeLevel
from jina.excepts import BadFlowYAMLVersion
from jina.executors.encoders import BaseEncoder
from jina.flow import BaseFlow
from jina.jaml import JAML
from jina.jaml.parsers import get_supported_versions
from jina.parsers.flow import set_flow_parser
from tests import rm_files

cur_dir = Path(__file__).parent

Expand Down Expand Up @@ -77,17 +76,14 @@ def test_load_flow_from_cli():

def test_load_flow_from_yaml():
with open(cur_dir.parent / 'yaml' / 'test-flow.yml') as fp:
a = Flow.load_config(fp)

_ = Flow.load_config(fp)

def test_flow_yaml_dump():
f = Flow(optimize_level=FlowOptimizeLevel.IGNORE_GATEWAY,
no_gateway=True)
f.save_config('test1.yml')

fl = Flow.load_config('test1.yml')
assert f.args.optimize_level == fl.args.optimize_level
rm_files(['test1.yml'])
def test_flow_yaml_dump(tmpdir):
f = Flow()
f.save_config(os.path.join(str(tmpdir), 'test1.yml'))
fl = Flow.load_config(os.path.join(str(tmpdir), 'test1.yml'))
assert f.args.inspect == fl.args.inspect


def test_flow_yaml_from_string():
Expand Down

0 comments on commit 97b6637

Please sign in to comment.