Skip to content

Commit

Permalink
fix(daemon): miscellaneous fixes for logging, pod model, flow creation (
Browse files Browse the repository at this point in the history
#1637)

* fix(logid): pass correct logid to gateway and remote pods
  • Loading branch information
deepankarm authored Jan 9, 2021
1 parent 2470c38 commit d6d14f2
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 37 deletions.
13 changes: 8 additions & 5 deletions daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

def _get_app():
from .api.endpoints import common_router, flow, pod, pea, logs
from .config import jinad_config, fastapi_config, server_config, openapitags_config
from .config import jinad_config, fastapi_config, openapitags_config

context = namedtuple('context', ['router', 'openapi_tags', 'tags'])
_all_routers = {
Expand Down Expand Up @@ -68,13 +68,16 @@ def _start_uvicorn(app: 'FastAPI'):
log_level='error')
server = Server(config=config)
server.run()
daemon_logger.info('bye!')
daemon_logger.info('Bye!')


def _start_fluentd():
daemon_logger.info('starting fluentd')
daemon_logger.info('Starting fluentd')
cfg = pkg_resources.resource_filename('jina', 'resources/fluent.conf')
subprocess.Popen(['fluentd', '-c', cfg])
try:
subprocess.Popen(['fluentd', '-qq', '-c', cfg])
except FileNotFoundError:
daemon_logger.warning('Fluentd not found locally, Jinad cannot stream logs!')


def _parse_arg():
Expand All @@ -86,5 +89,5 @@ def _parse_arg():

def main():
_parse_arg()
threading.Thread(target=_start_fluentd).start()
threading.Thread(target=_start_fluentd, daemon=True).start()
_start_uvicorn(app=_get_app())
12 changes: 6 additions & 6 deletions daemon/api/endpoints/flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import uuid
from typing import List, Union
from typing import List, Union, Dict

from fastapi import status, APIRouter, Body, Response, File, UploadFile
from fastapi.exceptions import HTTPException
Expand All @@ -10,6 +10,7 @@
from jina.parsers import set_client_cli_parser
from ...excepts import FlowYamlParseException, FlowCreationException, FlowStartException
from ...models import SinglePodModel
from ...models.custom import build_pydantic_model
from ...store import flow_store

router = APIRouter()
Expand All @@ -20,11 +21,10 @@
summary='Build & start a Flow using Pods',
)
async def _create_from_pods(
pods: Union[List[SinglePodModel]] = Body(...,
example=json.loads(SinglePodModel().json()))
pods: Union[List[Dict]] = Body(..., example=json.loads(SinglePodModel().json()))
):
"""
Build a Flow using a list of `PodModel`
Build a Flow using a list of `SinglePodModel`
[
{
Expand Down Expand Up @@ -220,13 +220,13 @@ async def _ping(

@router.delete(
path='/flow',
summary='Close Flow context',
summary='Close Flow Context',
)
async def _delete(
flow_id: uuid.UUID
):
"""
Close Flow context
Close Flow Context
"""
with flow_store._session():
try:
Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ async def _create(

@router.delete(
path='/pea',
summary='Close Pea context',
summary='Close Pea Context',
)
async def _delete(
pea_id: uuid.UUID
):
"""Close Pea context
"""Close Pea Context
"""
with pea_store._session():
try:
Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ async def _create(

@router.delete(
path='/pod',
summary='Delete pod',
summary='Close Pod Context',
)
async def _delete(
pod_id: uuid.UUID
):
"""Close Pod context
"""Close Pod Context
"""
with pod_store._session():
try:
Expand Down
1 change: 1 addition & 0 deletions daemon/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .flow import FlowModel
from .pea import PeaModel
from .pod import SinglePodModel, ParallelPodModel
from .custom import build_pydantic_model
5 changes: 4 additions & 1 deletion daemon/models/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def get_pydantic_fields(config: Union[dict, argparse.ArgumentParser]):
description=arg['help'])
all_options[arg_key] = (arg_type, current_field)

# TODO(Deepankar): possible refactoring to `jina.api_to_dict()`
# Issue: For all args that have a default value which comes from a function call (get_random_identity() or random_port()),
# 1st pydantic model invocation sets these default values, means build_pydantic_model(...) sets the args, not SinglePodModel()
# In case of multiple Pods, port conflict happens because of same port set as default in both.
# TODO(Deepankar): Add support for `default_factory` for default args that are functions
if isinstance(config, argparse.ArgumentParser):
# Ignoring first 3 as they're generic args
from jina.parsers.helper import KVAppendAction
Expand Down
3 changes: 1 addition & 2 deletions daemon/models/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from .custom import build_pydantic_model

SinglePodModel = build_pydantic_model(model_name='PodModel',
module='pod')
SinglePodModel = build_pydantic_model(model_name='SinglePodModel', module='pod')


class ParallelPodModel(BaseModel):
Expand Down
23 changes: 13 additions & 10 deletions daemon/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from fastapi import UploadFile

from jina.flow import Flow
from jina.helper import colored, get_random_identity
from jina.jaml import JAML
from jina.helper import colored
from jina.enums import PodRoleType
from jina.logging import JinaLogger
from jina.peapods import Pea, Pod
from .excepts import FlowYamlParseException, FlowCreationException, \
FlowStartException, PodStartException, PeaStartException, FlowBadInputException
from .helper import create_meta_files_from_upload, delete_meta_files_from_upload
from .models import SinglePodModel
from .models import SinglePodModel, build_pydantic_model


class InMemoryStore:
Expand Down Expand Up @@ -86,16 +87,13 @@ def _create(self,
elif isinstance(config, list):
try:
flow = self._build_with_pods(pod_args=config)
with flow:
pass
except Exception as e:
self.logger.error(f'Got error while creating flows via pods: {repr(e)}')
raise FlowCreationException
else:
raise FlowBadInputException(f'Not valid Flow config input {type(config)}')

try:
flow.args.log_id = flow.args.identity if 'identity' in flow.args else get_random_identity()
flow_id = uuid.UUID(flow.args.log_id)
flow = self._start(context=flow)
except Exception as e:
Expand All @@ -109,18 +107,23 @@ def _create(self,
return flow_id, flow.host, flow.port_expose

def _build_with_pods(self,
pod_args: List[SinglePodModel]):
""" Since we rely on PodModel, this can accept all params that a Pod can accept """
pod_args: List[Dict]):
""" Since we rely on SinglePodModel, this can accept all params that a Pod can accept """
flow = Flow()
for current_pod_args in pod_args:
_current_pod_args = current_pod_args.dict()
# Hacky code here. We build `SinglePodModel` from `Dict` everytime to reset the default values
SinglePodModel = build_pydantic_model(model_name='SinglePodModel',
module='pod')
_current_pod_args = SinglePodModel(**current_pod_args).dict()

if not _current_pod_args.get('pod_role'):
_current_pod_args.update(pod_role=PodRoleType.POD)
_current_pod_args.pop('log_config')
flow = flow.add(**_current_pod_args)
return flow

def _get(self,
flow_id: uuid.UUID,
yaml_only: bool = False):
flow_id: uuid.UUID):
""" Fetches a Flow from the store """
if flow_id not in self._store:
raise KeyError(f'{flow_id} not found')
Expand Down
2 changes: 1 addition & 1 deletion extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flask: devel, cicd, http, sse
flask_cors: devel, cicd, http, sse
fastapi: devel, cicd, http, test, daemon
uvicorn>=0.12.1: devel, cicd, http, test, daemon
fluent-logger: logging, http, sse, dashboard, devel, cicd, test
fluent-logger: logging, http, sse, dashboard, devel, cicd, test, daemon
nmslib>=1.6.3: index
docker: devel, cicd, network, hub, test
torch>=1.1.0: framework, cicd
Expand Down
1 change: 1 addition & 0 deletions jina/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def _add_gateway(self, needs, **kwargs):
ctrl_with_ipc=True, # otherwise ctrl port would be conflicted
read_only=True,
runtime_cls='GRPCRuntime',
log_id=self.args.log_id,
pod_role=PodRoleType.GATEWAY))

kwargs.update(self._common_kwargs)
Expand Down
4 changes: 3 additions & 1 deletion jina/peapods/runtimes/jinad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ async def async_run_forever(self):
"""
Streams log messages using websocket from remote server
"""
self._logging_task = asyncio.create_task(self.api.logstream(self._remote_id))
self._logging_task = asyncio.create_task(
self.api.logstream(remote_id=self._remote_id, log_id=self.args.log_id)
)

async def async_cancel(self):
"""
Expand Down
6 changes: 4 additions & 2 deletions jina/peapods/runtimes/jinad/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def create(self, args: Dict, **kwargs) -> Optional[str]:
except requests.exceptions.RequestException as ex:
self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}')

async def logstream(self, remote_id: 'str'):
async def logstream(self, remote_id: 'str', log_id: 'str'):
""" websocket log stream from remote pea/pod
:param remote_id: the identity of that pea/pod
:return:
Expand Down Expand Up @@ -192,8 +192,10 @@ async def logstream(self, remote_id: 'str'):
complete_log_message = log_line[current_line_number]
log_line_dict = json.loads(complete_log_message.split('\t')[-1].strip())
name = log_line_dict['name']

if name not in remote_loggers:
remote_loggers[name] = JinaLogger(context=f'🌏 {name}')
remote_loggers[name] = JinaLogger(context=f'🌏 {name}', log_id=log_id)

# TODO(Deepankar): change logging level, process name in local logger
remote_loggers[name].info(f'{log_line_dict["message"].strip()}')
except json.decoder.JSONDecodeError:
Expand Down
12 changes: 7 additions & 5 deletions tests/jinad/unit/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import pytest
from fastapi import UploadFile

from daemon.models import SinglePodModel
from daemon.store import InMemoryPeaStore, InMemoryPodStore, InMemoryFlowStore
from jina.enums import PodRoleType
from jina.flow import Flow
from jina.parsers import set_pea_parser, set_pod_parser
from jina.peapods.pods import BasePod

cur_dir = Path(__file__).parent


def pod_list():
return [SinglePodModel(pod_role=PodRoleType.POD)]
def pod_list_one():
return [{'name': 'pod1'}]


def pod_list_multiple():
return [{'name': 'pod1'}, {'name': 'pod2'}]


def flow_file_str():
Expand All @@ -24,7 +26,7 @@ def flow_file_str():
return config_str


@pytest.mark.parametrize('config', [flow_file_str(), pod_list()])
@pytest.mark.parametrize('config', [flow_file_str(), pod_list_one(), pod_list_multiple()])
def test_flow_store(config):
store = InMemoryFlowStore()
with store._session():
Expand Down

0 comments on commit d6d14f2

Please sign in to comment.