Skip to content

Commit

Permalink
feat(daemon): add env to partial daemon (#3996)
Browse files Browse the repository at this point in the history
* feat: add env to partial daemon and store

* feat: add docstrings

* feat: set envs in partial flow and peapod

* feat(daemon): use dict as default env value

* feat(daemon): add unit test for env setting

* feat: add env setting test

* feat(daemon): refine test structure

* feat(daemon): adjust jina d test

* feat(daemon): finish distributed testing

* feat(daemon): fix test ip

* feat(daemon): refine default host

* test: refactor env var tests

* ci: cleanup check for distributed

* feat(daemon): fix partial client test

* test: fix docs csv path

Co-authored-by: Deepankar Mahapatro <deepankar.mahapatro@jina.ai>
  • Loading branch information
bwanglzu and deepankarm committed Nov 29, 2021
1 parent 9db3de1 commit 2cd9f31
Show file tree
Hide file tree
Showing 21 changed files with 131 additions and 24 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/cd.yml
Expand Up @@ -164,7 +164,10 @@ jobs:
if [[ "${{ matrix.test-path }}" =~ ^tests/distributed/* || "${{ matrix.test-path }}" =~ ^tests/daemon/* ]]; then
# Build daemon for all daemon/distributed tests
docker build -f Dockerfiles/debianx.Dockerfile --build-arg PIP_TAG=daemon -t jinaai/jina:test-daemon .
if [[ "${{ matrix.test-path }}" =~ ^(tests/distributed/test_topologies/|tests/distributed/test_topologies_docker/|tests/distributed/test_workspaces/|tests/distributed/test_dir_structures/|tests/distributed/test_scale_remote/|tests/distributed/test_scale_remote_executors/)$ ]]; then
declare -a distributed_tests=("tests/distributed/test_topologies/" "tests/distributed/test_topologies_docker/" "tests/distributed/test_workspaces/" "tests/distributed/test_dir_structures/" "tests/distributed/test_scale_remote/" "tests/distributed/test_scale_remote_executors/" "tests/distributed/test_env_vars/")
if [[ " ${distributed_tests[*]} " =~ "${{ matrix.test-path }}" ]]; then
docker run --add-host=host.docker.internal:host-gateway --name jinad --env JINA_DAEMON_BUILD=DEVEL -v /var/run/docker.sock:/var/run/docker.sock -v /tmp/jinad:/tmp/jinad -p 8000:8000 -d jinaai/jina:test-daemon
fi
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=360 -v -s ${{ matrix.test-path }}
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Expand Up @@ -250,7 +250,10 @@ jobs:
if [[ "${{ matrix.test-path }}" =~ ^tests/distributed/* || "${{ matrix.test-path }}" =~ ^tests/daemon/* ]]; then
# Build daemon for all daemon/distributed tests
docker build -f Dockerfiles/debianx.Dockerfile --build-arg PIP_TAG=daemon -t jinaai/jina:test-daemon .
if [[ "${{ matrix.test-path }}" =~ ^(tests/distributed/test_topologies/|tests/distributed/test_topologies_docker/|tests/distributed/test_workspaces/|tests/distributed/test_dir_structures/|tests/distributed/test_scale_remote/|tests/distributed/test_scale_remote_executors/)$ ]]; then
declare -a distributed_tests=("tests/distributed/test_topologies/" "tests/distributed/test_topologies_docker/" "tests/distributed/test_workspaces/" "tests/distributed/test_dir_structures/" "tests/distributed/test_scale_remote/" "tests/distributed/test_scale_remote_executors/" "tests/distributed/test_env_vars/")
if [[ " ${distributed_tests[*]} " =~ "${{ matrix.test-path }}" ]]; then
docker run --add-host=host.docker.internal:host-gateway --name jinad --env JINA_DAEMON_BUILD=DEVEL -v /var/run/docker.sock:/var/run/docker.sock -v /tmp/jinad:/tmp/jinad -p 8000:8000 -d jinaai/jina:test-daemon
fi
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=360 -v -s ${{ matrix.test-path }}
Expand Down
6 changes: 4 additions & 2 deletions daemon/api/endpoints/partial/flow.py
Expand Up @@ -30,14 +30,16 @@ async def _status():
status_code=201,
response_model=PartialFlowItem,
)
async def _create(flow: 'FlowModel', ports: Optional[PortMappings] = None):
async def _create(
flow: 'FlowModel', ports: Optional[PortMappings] = None, envs: Optional[Dict] = {}
):
"""
.. #noqa: DAR101
.. #noqa: DAR201"""
try:
args = ArgNamespace.kwargs2namespace(flow.dict(), set_flow_parser())
return store.add(args, ports)
return store.add(args, ports, envs)
except Exception as ex:
raise PartialDaemon400Exception from ex

Expand Down
5 changes: 3 additions & 2 deletions daemon/api/endpoints/partial/pea.py
@@ -1,3 +1,4 @@
from typing import Optional, Dict
from fastapi import APIRouter

from jina.helper import ArgNamespace
Expand Down Expand Up @@ -29,14 +30,14 @@ async def _status():
status_code=201,
response_model=PartialStoreItem,
)
async def _create(pea: 'PeaModel'):
async def _create(pea: 'PeaModel', envs: Optional[Dict] = {}):
"""
.. #noqa: DAR101
.. #noqa: DAR201"""
try:
args = ArgNamespace.kwargs2namespace(pea.dict(), set_pea_parser())
return store.add(args)
return store.add(args, envs)
except Exception as ex:
raise PartialDaemon400Exception from ex

Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/partial/pod.py
Expand Up @@ -31,14 +31,14 @@ async def _status():
status_code=201,
response_model=PartialStoreItem,
)
async def _create(pod: 'PodModel'):
async def _create(pod: 'PodModel', envs: Optional[Dict] = {}):
"""
.. #noqa: DAR101
.. #noqa: DAR201"""
try:
args = ArgNamespace.kwargs2namespace(pod.dict(), set_pod_parser())
return store.add(args)
return store.add(args, envs)
except Exception as ex:
raise PartialDaemon400Exception from ex

Expand Down
2 changes: 1 addition & 1 deletion daemon/stores/containers.py
Expand Up @@ -39,7 +39,7 @@ class ContainerStore(BaseStore, ABC):
_status_model = ContainerStoreStatus

@abstractmethod
async def add_in_partial(self, uri, *args, **kwargs):
async def add_in_partial(self, uri, envs, *args, **kwargs):
"""Implements jina object creation in `partial-daemon`
.. #noqa: DAR101"""
Expand Down
7 changes: 5 additions & 2 deletions daemon/stores/flows.py
Expand Up @@ -14,19 +14,22 @@ class FlowStore(ContainerStore, AiohttpMixin):

_kind = 'flow'

async def add_in_partial(self, uri: str, params: Dict, **kwargs) -> Dict:
async def add_in_partial(
self, uri: str, params: Dict, envs: Optional[Dict] = {}, **kwargs
) -> Dict:
"""Sends `POST` request to `partial-daemon` to create a Flow.
:param uri: uri of partial-daemon
:param params: Flow params
:param envs: environment variables to be passed into partial flow
:param kwargs: keyword args
:return: response from POST request
"""
ports = kwargs.get('ports', [])
return await self.POST(
url=f'{uri}/{self._kind}',
params=None,
json={'flow': params, 'ports': ports},
json={'flow': params, 'ports': ports, 'envs': envs},
)

async def delete_in_partial(self, uri: str, **kwargs) -> Dict:
Expand Down
17 changes: 12 additions & 5 deletions daemon/stores/partial.py
Expand Up @@ -48,10 +48,13 @@ class PartialPeaStore(PartialStore):

peapod_cls = Pea

def add(self, args: Namespace, **kwargs) -> PartialStoreItem:
def add(
self, args: Namespace, envs: Optional[Dict] = {}, **kwargs
) -> PartialStoreItem:
"""Starts a Pea in `partial-daemon`
:param args: namespace args for the pea/pod
:param envs: environment variables to be passed into partial pea/pod
:param kwargs: keyword args
:return: Item describing the Pea object
"""
Expand All @@ -62,6 +65,7 @@ def add(self, args: Namespace, **kwargs) -> PartialStoreItem:
if args.runtime_cls == 'ContainerRuntime':
args.docker_kwargs = {'extra_hosts': {__docker_host__: 'host-gateway'}}
self.object: Union['Pea', 'Pod'] = self.peapod_cls(args).__enter__()
self.object.env = envs
except Exception as e:
if hasattr(self, 'object'):
self.object.__exit__(type(e), e, e.__traceback__)
Expand Down Expand Up @@ -117,12 +121,17 @@ class PartialFlowStore(PartialStore):
"""A Flow store spawned inside partial-daemon container"""

def add(
self, args: Namespace, port_mapping: Optional[PortMappings] = None, **kwargs
self,
args: Namespace,
port_mapping: Optional[PortMappings] = None,
envs: Optional[Dict] = {},
**kwargs,
) -> PartialStoreItem:
"""Starts a Flow in `partial-daemon`.
:param args: namespace args for the flow
:param port_mapping: ports to be set
:param envs: environment variables to be passed into partial flow
:param kwargs: keyword args
:return: Item describing the Flow object
"""
Expand All @@ -135,9 +144,7 @@ def add(
self.object: Flow = Flow.load_config(args.uses).build()
self.object.workspace_id = jinad_args.workspace_id
self.object.workspace = __partial_workspace__
self.object.env = {'HOME': __partial_workspace__}
# TODO(Deepankar): pass envs from main daemon process to partial-daemon so that
# Pods/Peas/Runtimes/Executors can inherit these env vars
self.object.env = {'HOME': __partial_workspace__, **envs}

for pod in self.object._pod_nodes.values():
runtime_cls = update_runtime_cls(pod.args, copy=True).runtime_cls
Expand Down
9 changes: 6 additions & 3 deletions daemon/stores/peas.py
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Optional

from .mixin import AiohttpMixin
from .containers import ContainerStore
Expand All @@ -9,18 +9,21 @@ class PeaStore(ContainerStore, AiohttpMixin):

_kind = 'pea'

async def add_in_partial(self, uri: str, params: Dict, **kwargs) -> Dict:
async def add_in_partial(
self, uri: str, params: Dict, envs: Optional[Dict] = {}, **kwargs
) -> Dict:
"""Sends `POST` request to `partial-daemon` to create a Pea/Pod.
:param uri: uri of partial-daemon
:param params: json payload to be sent
:param envs: environment variables to be passed into partial pea
:param kwargs: keyword args
:return: response from mini-jinad
"""
return await self.POST(
url=f'{uri}/{self._kind}',
params=None,
json=params,
json={self._kind: params, 'envs': envs},
)

async def delete_in_partial(self, uri, **kwargs) -> Dict:
Expand Down
5 changes: 4 additions & 1 deletion tests/daemon/unit/api/endpoints/partial/test_pea.py
Expand Up @@ -9,7 +9,10 @@
def test_pea_api(partial_pea_client):
pea_model = PeaModel()

response = partial_pea_client.post(api, json=pea_model.dict(exclude={'log_config'}))
response = partial_pea_client.post(
api,
json={'pea': pea_model.dict(exclude={'log_config'}), 'envs': {'key1': 'val1'}},
)
assert response

response = partial_pea_client.get(api)
Expand Down
5 changes: 4 additions & 1 deletion tests/daemon/unit/api/endpoints/partial/test_pod.py
Expand Up @@ -9,7 +9,10 @@
def test_pod_api(partial_pod_client):
pod_model = PodModel()

response = partial_pod_client.post(api, json=pod_model.dict(exclude={'log_config'}))
response = partial_pod_client.post(
api,
json={'pod': pod_model.dict(exclude={'log_config'}), 'envs': {'key1': 'val1'}},
)
assert response

response = partial_pod_client.get(api)
Expand Down
7 changes: 5 additions & 2 deletions tests/daemon/unit/stores/test_partialstore.py
Expand Up @@ -28,10 +28,12 @@ def partial_flow_store():

def test_peastore_add(partial_pea_store):
partial_store_item = partial_pea_store.add(
args=ArgNamespace.kwargs2namespace(PeaModel().dict(), set_pea_parser())
args=ArgNamespace.kwargs2namespace(PeaModel().dict(), set_pea_parser()),
envs={'key1': 'val1'},
)
assert partial_store_item
assert partial_pea_store.object
assert partial_pea_store.object.env['key1'] == 'val1'
assert partial_store_item.arguments['runtime_cls'] == 'ZEDRuntime'
assert partial_store_item.arguments['host_in'] == __default_host__
assert partial_store_item.arguments['host_out'] == __default_host__
Expand All @@ -51,8 +53,9 @@ def test_flowstore_add(monkeypatch, partial_flow_store):
flow_model = FlowModel()
flow_model.uses = f'{cur_dir}/flow.yml'
args = ArgNamespace.kwargs2namespace(flow_model.dict(), set_flow_parser())
partial_store_item = partial_flow_store.add(args)
partial_store_item = partial_flow_store.add(args, envs={'key1': 'val1'})

assert partial_flow_store.object.env['key1'] == 'val1'
assert partial_store_item
assert isinstance(partial_flow_store.object, Flow)
assert 'executor1' in partial_store_item.yaml_source
Expand Down
Empty file.
4 changes: 4 additions & 0 deletions tests/distributed/test_env_vars/envvars_ws2/config.yml
@@ -0,0 +1,4 @@
jtype: EnvExecutor
metas:
py_modules:
- exec.py
12 changes: 12 additions & 0 deletions tests/distributed/test_env_vars/envvars_ws2/exec.py
@@ -0,0 +1,12 @@
import os

from jina import Executor, requests


class EnvExecutor(Executor):
@requests
def set_env(self, docs, **kwargs):
for doc in docs:
doc.tags['key1'] = os.environ.get('context_var_1')
doc.tags['key2'] = os.environ.get('context_var_2')
doc.tags['replicas'] = os.environ.get('num_replicas')
10 changes: 10 additions & 0 deletions tests/distributed/test_env_vars/envvars_ws2/flow_with_env.yml
@@ -0,0 +1,10 @@
jtype: Flow
with:
port_expose: 12345
envs:
key1: ${{ context_var_1 }}
key2: ${{ context_var_2 }}
executors:
- name: env_executor
uses: config.yml
replicas: ${{ num_replicas }}
50 changes: 50 additions & 0 deletions tests/distributed/test_env_vars/test_remote_flow_executor.py
@@ -0,0 +1,50 @@
import os
from contextlib import contextmanager

import pytest

from daemon.clients import JinaDClient
from jina import Document, Client

NUM_DOCS = 10

cur_dir = os.path.dirname(os.path.abspath(__file__))
jinad_client = JinaDClient(host='localhost', port=8000)


@contextmanager
def RemoteFlow(filename, envs):
flow_id = None
try:
workspace_id = jinad_client.workspaces.create(
paths=[os.path.join(cur_dir, 'envvars_ws2')]
)
flow_id = jinad_client.flows.create(
workspace_id=workspace_id, filename=filename, envs=envs
)
yield
finally:
if flow_id:
assert jinad_client.flows.delete(flow_id), 'Flow termination failed'
print(f'Remote Flow {flow_id} successfully terminated')


@pytest.mark.parametrize('replicas', ['1', '2'])
def test_remote_flow_local_executors(replicas):
with RemoteFlow(
filename='flow_with_env.yml',
envs={
'context_var_1': 'val1',
'context_var_2': 'val2',
'num_replicas': replicas,
},
):
resp = Client(host='localhost', port=12345).post(
on='/',
inputs=[Document(id=idx) for idx in range(NUM_DOCS)],
return_results=True,
)
for doc in resp[0].data.docs:
assert doc.tags['key1'] == 'val1'
assert doc.tags['key2'] == 'val2'
assert doc.tags['replicas'] == replicas
Expand Up @@ -13,7 +13,7 @@
def test_port_expose_env_var(port_expose, func):
jinad_client = JinaDClient(host='localhost', port=8000)
workspace_id = jinad_client.workspaces.create(
paths=[os.path.join(cur_dir, 'envvars_ws')]
paths=[os.path.join(cur_dir, 'envvars_ws1')]
)
flow_id = jinad_client.flows.create(
workspace_id=workspace_id,
Expand Down

0 comments on commit 2cd9f31

Please sign in to comment.