Skip to content

Commit

Permalink
fix(daemon): manage ports for remote flows (#3266)
Browse files Browse the repository at this point in the history
* fix(daemon): load and dump flow config

* style: fix overload and cli autocomplete

* feat: fetch gateway from host network

* fix: filename in partiald

* test: remote flow containerized executor

* refactor: move runtime_cls to helper

* fix(daemon): ports for compound pod

* fix(daemon): ports for compound pod

* fix: port_expose changes for flow

* test: dependency check with grpc req

Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
deepankarm and jina-bot committed Aug 30, 2021
1 parent aa0d167 commit ca7375a
Show file tree
Hide file tree
Showing 28 changed files with 538 additions and 139 deletions.
114 changes: 95 additions & 19 deletions daemon/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import os
from pathlib import Path
from http import HTTPStatus
from contextlib import ExitStack
from collections import defaultdict
from typing import Dict, List, Optional

from pydantic import FilePath
from pydantic.errors import PathNotAFileError
from fastapi import HTTPException, UploadFile, File, Query, Depends

from jina import __docker_host__, Flow
from jina.enums import PeaRoleType, SocketType, RemoteWorkspaceState
from jina.helper import cached_property, random_port
from jina import Flow, __docker_host__
from jina.helper import cached_property
from jina.peapods import CompoundPod
from jina.peapods.peas.helper import update_runtime_cls
from jina.enums import (
PeaRoleType,
SocketType,
RemoteWorkspaceState,
)
from .. import daemon_logger
from ..models import DaemonID, FlowModel, PodModel, PeaModel
from ..models import DaemonID, FlowModel, PodModel, PeaModel, GATEWAY_RUNTIME_DICT
from ..helper import get_workspace_path, change_cwd, change_env
from ..stores import workspace_store as store

Expand Down Expand Up @@ -58,11 +66,11 @@ def __init__(
uses=self.filename, workspace_id=self.workspace_id.jid, identity=self.id
)
self.envs = envs.vars
self.validate()
self._ports = {}
self.load_and_dump()

def localpath(self) -> Path:
"""
Validates local filepath in workspace from filename.
"""Validates local filepath in workspace from filename.
Raise 404 if filepath doesn't exist in workspace.
:return: filepath for flow yaml
Expand All @@ -77,27 +85,95 @@ def localpath(self) -> Path:
detail=f'File `{self.filename}` not found in workspace `{self.workspace_id}`',
)

@cached_property
def port_expose(self) -> str:
@property
def newfile(self) -> str:
"""return newfile path in format
`<root-workspace>/<workspace-id>/<flow-id>_<original-filename>`
:return: return filepath to save flow config in
"""
Sets `port_expose` for the Flow started in `partial-daemon`.
This port needs to be exposed before starting `partial-daemon`, hence set here.
If env vars are passed, set them in current env, to make sure all values are replaced.
Before loading the flow yaml, change CWD to workspace dir.
return get_workspace_path(self.workspace_id, f'{self.id}_{self.filename}')

:return: port_expose
def load_and_dump(self) -> None:
"""
every Flow created inside JinaD lives inside a container. It is important to know the
list of ports to be published with localhost before actually starting the container.
1. `load` the flow yaml here.
- yaml is stored in `workspace` directory, so we'll `cd` there
- yaml might include env vars. so we'll set them (passed via query params)
2. `build` the Flow so that `gateway` gets added.
- get the list of ports to be published (port_expose, port_in, port_out, port_ctrl)
- ports need to be published for gateway & executors that are not `ContainerRuntime` or `JinadRuntime` based
- note that, Pod level args for ports are enough, as we don't need to publish Pea ports
- all the above Pods also run in docker, hence we set `runs_in_docker`
3. `save` the Flow config.
- saves port configs of all `executors` into the new yaml.
- set `JINA_FULL_CLI` envvar, so that `gateway` args are also added.
- save the config into a new file.
4. pass this new file as filename to `partial-daemon` to start the Flow
"""
with ExitStack() as stack:

port_args = ['port_in', 'port_out', 'port_ctrl']
port_mapping = defaultdict(dict)

# set env vars
stack.enter_context(change_env('JINA_FULL_CLI', 'true'))
if self.envs:
for key, val in self.envs.items():
stack.enter_context(change_env(key, val))

# change directory to `workspace`
stack.enter_context(change_cwd(get_workspace_path(self.workspace_id)))
f = Flow.load_config(str(self.localpath()))
return f.port_expose or random_port()

def validate(self) -> None:
"""Validates and sets arguments to be used in store"""
self.ports = {f'{self.port_expose}/tcp': self.port_expose}
# load and build
f: Flow = Flow.load_config(str(self.localpath())).build()

# get & set the ports mapping, set `runs_in_docker`
port_mapping['gateway']['port_expose'] = f.port_expose
for pod_name, pod in f._pod_nodes.items():
runtime_cls = update_runtime_cls(pod.args, copy=True).runtime_cls
if runtime_cls in ['ZEDRuntime'] + list(GATEWAY_RUNTIME_DICT.values()):
if isinstance(pod, CompoundPod):
# For a `CompoundPod`, publish ports only for head Pea & tail Pea
# Check daemon.stores.partial.PartialFlowStore.add() for addtional logic
# to handle `CompoundPod` inside partial-daemon.
for pea_args in [pod.head_args, pod.tail_args]:
pea_args.runs_in_docker = False
for port_arg in port_args:
port_mapping[pea_args.name][port_arg] = getattr(
pea_args, port_arg
)
else:
pod.args.runs_in_docker = True
for port_arg in port_args:
port_mapping[pod_name][port_arg] = getattr(
pod.args, port_arg
)
self.ports = port_mapping

# save to a new file & set it for partial-daemon
f.save_config(filename=self.newfile)
self.params.uses = os.path.basename(self.newfile)

@property
def ports(self):
"""getter for ports
:return: ports to be mapped
"""
return self._ports

@ports.setter
def ports(self, port_mapping: Dict):
"""setter for ports
:param port_mapping: port mapping
"""
self._ports = {
f'{port}/tcp': port for v in port_mapping.values() for port in v.values()
}


class PeaDepends:
Expand Down
1 change: 0 additions & 1 deletion daemon/api/endpoints/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async def _create(flow: FlowDepends = Depends(FlowDepends)):
params=flow.params,
ports=flow.ports,
envs=flow.envs,
port_expose=flow.port_expose,
)
except Exception as ex:
raise Runtime400Exception from ex
Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/partial/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ async def _status():
status_code=201,
response_model=PartialFlowItem,
)
async def _create(flow: 'FlowModel', port_expose: int):
async def _create(flow: 'FlowModel'):
"""
.. #noqa: DAR101
.. #noqa: DAR201"""
try:
args = ArgNamespace.kwargs2namespace(flow.dict(), set_flow_parser())
return store.add(args, port_expose)
return store.add(args)
except Exception as ex:
raise PartialDaemon400Exception from ex

Expand Down
8 changes: 8 additions & 0 deletions daemon/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from pydantic import BaseModel

from jina.enums import GatewayProtocolType
from .containers import ContainerItem, ContainerStoreStatus
from .custom import build_pydantic_model
from .id import DaemonID
Expand All @@ -13,6 +14,13 @@
PeaModel = build_pydantic_model(model_name='PeaModel', module='pea')


GATEWAY_RUNTIME_DICT = {
GatewayProtocolType.GRPC: 'GRPCRuntime',
GatewayProtocolType.WEBSOCKET: 'WebSocketRuntime',
GatewayProtocolType.HTTP: 'HTTPRuntime',
}


class DaemonStatus(BaseModel):
"""Pydantic model for DaemonStatus"""

Expand Down
50 changes: 28 additions & 22 deletions daemon/stores/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

from jina import __docker_host__
from jina.helper import colored, random_port
from jina.enums import RemoteWorkspaceState
from .base import BaseStore
from ..dockerize import Dockerizer
from ..excepts import (
PartialDaemon400Exception,
Runtime400Exception,
PartialDaemonConnectionException,
)
from ..helper import if_alive, id_cleaner, error_msg_from
Expand All @@ -38,23 +38,25 @@ class ContainerStore(BaseStore):
_status_model = ContainerStoreStatus

async def _add(self, uri, *args, **kwargs):
"""Implements jina object creation in `mini-jinad`
"""Implements jina object creation in `partial-daemon`
.. #noqa: DAR101"""
raise NotImplementedError

@if_alive
async def _update(self, uri: str, params: Dict, **kwargs) -> Dict:
"""Sends `PUT` request to `mini-jinad` to execute a command on a Flow.
"""Sends `PUT` request to `partial-daemon` to execute a command on a Flow.
:param uri: uri of mini-jinad
:param uri: uri of partial-daemon
:param params: json payload to be sent
:param kwargs: keyword args
:raises PartialDaemon400Exception: if update fails
:return: response from mini-jinad
:return: response from partial-daemon
"""

self._logger.debug(f'sending PUT request to mini-jinad on {uri}/{self._kind}')
self._logger.debug(
f'sending PUT request to partial-daemon on {uri}/{self._kind}'
)
async with aiohttp.request(
method='PUT', url=f'{uri}/{self._kind}', params=params
) as response:
Expand All @@ -64,16 +66,16 @@ async def _update(self, uri: str, params: Dict, **kwargs) -> Dict:
return response_json

async def _delete(self, uri, *args, **kwargs):
"""Implements jina object termination in `mini-jinad`
"""Implements jina object termination in `partial-daemon`
.. #noqa: DAR101"""
raise NotImplementedError

async def ready(self, uri) -> bool:
"""Check if the container with mini-jinad is alive
"""Check if the container with partial-daemon is alive
:param uri: uri of mini-jinad
:return: True if mini-jinad is ready"""
:param uri: uri of partial-daemon
:return: True if partial-daemon is ready"""
async with aiohttp.ClientSession() as session:
for _ in range(20):
try:
Expand All @@ -88,15 +90,15 @@ async def ready(self, uri) -> bool:
continue
except Exception as e:
self._logger.error(
f'error while checking if mini-jinad is ready: {e}'
f'error while checking if partial-daemon is ready: {e}'
)
self._logger.error(
f'couldn\'t reach {self._kind.title()} container at {uri} after 10secs'
)
return False

def _uri(self, port: int) -> str:
"""Returns uri of mini-jinad.
"""Returns uri of partial-daemon.
NOTE: JinaD (running inside a container) needs to access other containers via dockerhost.
Mac/WSL: this would work as is, as dockerhost is accessible.
Expand All @@ -105,7 +107,7 @@ def _uri(self, port: int) -> str:
NOTE: Checks if we actually are in docker (needed for unit tests). If not docker, use localhost.
:param port: mini jinad port
:return: uri for mini-jinad
:return: uri for partial-daemon
"""

if (
Expand All @@ -118,14 +120,14 @@ def _uri(self, port: int) -> str:
return f'http://{__docker_host__}:{port}'

def _command(self, port: int, workspace_id: DaemonID) -> str:
"""Returns command for mini-jinad container to be appended to default entrypoint
"""Returns command for partial-daemon container to be appended to default entrypoint
NOTE: `command` is appended to already existing entrypoint, hence removed the prefix `jinad`
NOTE: Important to set `workspace_id` here as this gets set in jina objects in the container
:param port: mini jinad port
:param port: partial-daemon port
:param workspace_id: workspace id
:return: command for mini-jinad container
:return: command for partial-daemon container
"""
return f'--port {port} --mode {self._kind} --workspace-id {workspace_id.jid}'

Expand All @@ -147,7 +149,7 @@ async def add(
:param ports: ports to be mapped to local
:param envs: dict of env vars to be passed
:param kwargs: keyword args
:raises KeyError: if workspace_id doesn't exist in the store
:raises KeyError: if workspace_id doesn't exist in the store or not ACTIVE
:raises PartialDaemonConnectionException: if jinad cannot connect to partial
:return: id of the container
"""
Expand All @@ -156,11 +158,15 @@ async def add(

if workspace_id not in workspace_store:
raise KeyError(f'{workspace_id} not found in workspace store')
elif workspace_store[workspace_id].state != RemoteWorkspaceState.ACTIVE:
raise KeyError(
f'{workspace_id} is not ACTIVE yet. Please retry once it becomes ACTIVE'
)

minid_port = random_port()
ports.update({f'{minid_port}/tcp': minid_port})
uri = self._uri(minid_port)
command = self._command(minid_port, workspace_id)
partiald_port = random_port()
ports.update({f'{partiald_port}/tcp': partiald_port})
uri = self._uri(partiald_port)
command = self._command(partiald_port, workspace_id)
params = params.dict(exclude={'log_config'})

self._logger.debug(
Expand Down Expand Up @@ -215,7 +221,7 @@ async def add(
uri=uri,
),
arguments=ContainerArguments(
command=command,
command=f'jinad {command}',
object=object,
),
workspace_id=workspace_id,
Expand Down
20 changes: 10 additions & 10 deletions daemon/stores/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ class FlowStore(ContainerStore):
_kind = 'flow'

@if_alive
async def _add(self, uri: str, port_expose: int, params: Dict, **kwargs) -> Dict:
"""Sends `POST` request to `mini-jinad` to create a Flow.
async def _add(self, uri: str, params: Dict, **kwargs) -> Dict:
"""Sends `POST` request to `partial-daemon` to create a Flow.
:param uri: uri of mini-jinad
:param port_expose: port expose for container flow
:param uri: uri of partial-daemon
:param params: json payload to be sent
:param kwargs: keyword args
:raises PartialDaemon400Exception: if creation fails
:return: response from mini-jinad
:return: response from partial-daemon
"""
self._logger.debug(f'sending POST request to mini-jinad on {uri}/{self._kind}')
self._logger.debug(
f'sending POST request to partial-daemon on {uri}/{self._kind}'
)
async with aiohttp.request(
method='POST',
url=f'{uri}/{self._kind}',
params={'port_expose': str(port_expose)},
json=params,
) as response:
response_json = await response.json()
Expand All @@ -40,13 +40,13 @@ async def _add(self, uri: str, port_expose: int, params: Dict, **kwargs) -> Dict
async def _delete(self, uri, **kwargs) -> Dict:
"""Sends a `DELETE` request to terminate the Flow & remove the container
:param uri: uri of mini-jinad
:param uri: uri of partial-daemon
:param kwargs: keyword args
:raises PartialDaemon400Exception: if deletion fails
:return: response from mini-jinad
:return: response from partial-daemon
"""
self._logger.debug(
f'sending DELETE request to mini-jinad on {uri}/{self._kind}'
f'sending DELETE request to partial-daemon on {uri}/{self._kind}'
)
async with aiohttp.request(
method='DELETE', url=f'{uri}/{self._kind}'
Expand Down
Loading

0 comments on commit ca7375a

Please sign in to comment.