Skip to content

Commit

Permalink
fix(daemon): remote flow with parallel>1 (#3469)
Browse files Browse the repository at this point in the history
* test(daemon): remote flow with parallel>1

* fix(daemon): remote flow with parallel>1

* fix(daemon): set workspace 3475

* fix: add grpc data runtime
  • Loading branch information
deepankarm committed Sep 24, 2021
1 parent 707b7aa commit 48285d1
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 10 deletions.
12 changes: 9 additions & 3 deletions daemon/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ def load_and_dump(self) -> None:
for pod_name, pod in f._pod_nodes.items():
runtime_cls = update_runtime_cls(pod.args, copy=True).runtime_cls
if isinstance(pod, CompoundPod):
if runtime_cls in ['ZEDRuntime', 'ContainerRuntime'] + list(
GATEWAY_RUNTIME_DICT.values()
if (
runtime_cls
in [
'ZEDRuntime',
'GRPCDataRuntime',
'ContainerRuntime',
]
+ list(GATEWAY_RUNTIME_DICT.values())
):
# For a `CompoundPod`, publish ports for head Pea & tail Pea
# Check daemon.stores.partial.PartialFlowStore.add() for addtional logic
Expand All @@ -162,7 +168,7 @@ def load_and_dump(self) -> None:
)
)
else:
if runtime_cls in ['ZEDRuntime'] + list(
if runtime_cls in ['ZEDRuntime', 'GRPCDataRuntime'] + list(
GATEWAY_RUNTIME_DICT.values()
):
pod.args.runs_in_docker = True
Expand Down
3 changes: 3 additions & 0 deletions daemon/clients/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ async def create(
f'{self._kind.title()} creation failed as: {error_msg}'
)
return error_msg
self._logger.success(
f'Remote Flow created successfully with id {response_json}'
)
return response_json

@if_alive
Expand Down
4 changes: 2 additions & 2 deletions daemon/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ def error_msg_from(response: Dict) -> str:
:param response: dict response
:return: prettified response string
"""
assert 'detail' in response, '\'detail\' not found in response'
assert 'body' in response, '\'body\' not found in response'
if 'detail' not in response or 'body' not in response:
return response
if response['detail'] == PartialDaemon400Exception.__name__:
return response['body']
return (
Expand Down
22 changes: 19 additions & 3 deletions daemon/stores/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from jina import Flow, __docker_host__
from jina.logging.logger import JinaLogger

from .. import jinad_args
from .. import jinad_args, __partial_workspace__
from ..models import GATEWAY_RUNTIME_DICT
from ..models.enums import UpdateOperation
from ..models.ports import Ports, PortMappings
Expand Down Expand Up @@ -108,6 +108,7 @@ def add(

self.object: Flow = Flow.load_config(yaml_source).build()
self.object.workspace_id = jinad_args.workspace_id
self.object.workspace = __partial_workspace__

for pod in self.object._pod_nodes.values():
runtime_cls = update_runtime_cls(pod.args, copy=True).runtime_cls
Expand All @@ -116,8 +117,14 @@ def add(
# `runs_in_docker` to be False. Since `Flow` args are sent to all Pods, `runs_in_docker` gets set
# for the `CompoundPod`, which blocks the requests. Below we unset that (hacky & ugly).
# We do it only for runtimes that starts on local (not container or remote)
if runtime_cls in ['ZEDRuntime', 'ContainerRuntime'] + list(
GATEWAY_RUNTIME_DICT.values()
if (
runtime_cls
in [
'ZEDRuntime',
'GRPCDataRuntime',
'ContainerRuntime',
]
+ list(GATEWAY_RUNTIME_DICT.values())
):
pod.args.runs_in_docker = False
for replica_args in pod.replicas_args:
Expand All @@ -144,6 +151,15 @@ def add(
pod.replicas_args = CompoundPod._set_replica_args(
pod.args, pod.head_args, pod.tail_args
)
else:
# avoid setting runs_in_docker for Pods with parallel > 1 and using `ZEDRuntime`
# else, replica-peas would try connecting to head/tail-pea via __docker_host__
if (
runtime_cls in ['ZEDRuntime', 'GRPCDataRuntime']
and pod.args.parallel > 1
):
pod.args.runs_in_docker = False
pod.update_pea_args()

self.object = self.object.__enter__()
except Exception as e:
Expand Down
6 changes: 6 additions & 0 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,12 @@ def workspace(self) -> str:
.. # noqa: DAR201"""
return os.path.abspath(self.args.workspace or './')

@workspace.setter
def workspace(self, value: str):
self.args.workspace = value
for k, p in self:
p.args.workspace = value

@property
def workspace_id(self) -> Dict[str, str]:
"""Get all Pods' ``workspace_id`` values in a dict
Expand Down
64 changes: 63 additions & 1 deletion tests/distributed/test_topologies/test_topologies.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os

import pytest
import numpy as np

from jina import Flow, Document
from daemon.clients import JinaDClient
from daemon import __partial_workspace__
from jina import Flow, Document, Client, __default_host__

cur_dir = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -36,6 +39,7 @@ def test_r_l_simple(silent_log, parallels, mocker):
f.index(
inputs=(Document(text='hello') for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)

response_mock.assert_called()
Expand All @@ -50,6 +54,7 @@ def test_l_r_simple(parallels, mocker):
f.index(
inputs=(Document(text='hello') for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)
response_mock.assert_called()

Expand All @@ -68,6 +73,7 @@ def test_r_l_r_simple(parallels, mocker):
f.index(
inputs=(Document(text='hello') for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)
response_mock.assert_called()

Expand All @@ -86,6 +92,7 @@ def test_r_r_r_simple(parallels, mocker):
f.index(
inputs=(Document(text='hello') for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)
response_mock.assert_called()

Expand All @@ -99,6 +106,7 @@ def test_l_r_l_simple(parallels, mocker):
f.index(
inputs=(Document(text='hello') for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)
response_mock.assert_called()

Expand Down Expand Up @@ -144,3 +152,57 @@ def test_complex_needs(parallels, mocker):
on_done=response_mock,
)
response_mock.assert_called()


@pytest.mark.parametrize('parallel', [1, 2])
def test_remote_flow_local_executors(mocker, parallel):

client = JinaDClient(host=__default_host__, port=8000)
workspace_id = client.workspaces.create(paths=[os.path.join(cur_dir, 'yamls')])

GATEWAY_LOCAL_GATEWAY = 'flow_glg.yml'
GATEWAY_LOCAL_LOCAL_GATEWAY = 'flow_gllg.yml'

for flow_yaml in [
GATEWAY_LOCAL_GATEWAY,
GATEWAY_LOCAL_LOCAL_GATEWAY,
]:
response_mock = mocker.Mock()
flow_id = client.flows.create(
workspace_id=workspace_id, filename=flow_yaml, envs={'PARALLEL': parallel}
)
args = client.flows.get(flow_id)['arguments']['object']['arguments']
Client(
host=__default_host__,
port=args['port_expose'],
protocol=args['protocol'],
).post(
on='/',
inputs=(Document(blob=np.random.random([1, 100])) for _ in range(NUM_DOCS)),
on_done=response_mock,
show_progress=True,
)
response_mock.assert_called()
assert client.flows.delete(flow_id)

assert client.workspaces.delete(workspace_id)


def test_remote_workspace_value():
HOST = __default_host__ # '3.208.18.63'
client = JinaDClient(host=HOST, port=8000)
workspace_id = client.workspaces.create(paths=[os.path.join(cur_dir, 'yamls')])
flow_id = client.flows.create(
workspace_id=workspace_id, filename='flow_workspace_validate.yml'
)
args = client.flows.get(flow_id)['arguments']['object']['arguments']
response = Client(
host=HOST, port=args['port_expose'], protocol=args['protocol']
).post(on='/', inputs=[Document()], show_progress=True, return_results=True)
assert (
response[0]
.data.docs[0]
.text.startswith(f'{__partial_workspace__}/WorkspaceValidator/0')
)
assert client.flows.delete(flow_id)
assert client.workspaces.delete(workspace_id)
7 changes: 7 additions & 0 deletions tests/distributed/test_topologies/yamls/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from jina import Executor, requests, DocumentArray


class WorkspaceValidator(Executor):
@requests
def foo(self, docs: DocumentArray, *args, **kwargs):
docs[0].text = self.workspace
8 changes: 8 additions & 0 deletions tests/distributed/test_topologies/yamls/flow_glg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
jtype: Flow
version: 1
with:
port_expose: 9000
protocol: http
executors:
- name: local
parallel: $PARALLEL
10 changes: 10 additions & 0 deletions tests/distributed/test_topologies/yamls/flow_gllg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
jtype: Flow
version: 1
with:
port_expose: 9000
protocol: http
executors:
- name: local1
parallel: $PARALLEL
- name: local2
parallel: $PARALLEL
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
jtype: Flow
version: 1
with:
port_expose: 9000
protocol: http
executors:
- name: local
uses: WorkspaceValidator
py_module: executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def test_l_r_l_docker(parallels, docker_image, mocker):


def test_remote_flow_containerized_executors(docker_image, mocker):
response_mock = mocker.Mock()
client = JinaDClient(host=__default_host__, port=8000)
workspace_id = client.workspaces.create(paths=[os.path.join(cur_dir, 'yamls')])

Expand All @@ -142,6 +141,7 @@ def test_remote_flow_containerized_executors(docker_image, mocker):
GATEWAY_LOCAL_CONTAINER_GATEWAY,
GATEWAY_CONTAINER_LOCAL_CONTAINER_GATEWAY,
]:
response_mock = mocker.Mock()
flow_id = client.flows.create(workspace_id=workspace_id, filename=flow_yaml)
args = client.flows.get(flow_id)['arguments']['object']['arguments']
Client(
Expand Down

0 comments on commit 48285d1

Please sign in to comment.