Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(jinad): log streaming from remote #1584

Merged
merged 7 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions jina/excepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ class BadNamedScoreType(TypeError):
""" Exception when can not construct a named score from the given data """


class RemotePodClosed(Exception):
""" Exception when remote pod is closed and log streaming needs to exit """


class LengthMismatchException(Exception):
""" Exception when length of two items should be identical while not """

Expand Down
16 changes: 15 additions & 1 deletion jina/peapods/runtimes/jinad/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
from multiprocessing import Event
from typing import Union, Dict, Optional

from .api import get_jinad_api
Expand All @@ -10,17 +11,30 @@ class JinadRuntime(ZMQManyRuntime):

def __init__(self, args: Union['argparse.Namespace', Dict]):
super().__init__(args)
self.exit_event = Event()
self.exit_event.clear()
self.api = get_jinad_api(kind=self.remote_type,
host=self.host,
port=self.port_expose,
logger=self.logger)

def setup(self):
# Uploads Pod/Pea context to remote & Creates remote Pod/Pea using :class:`JinadAPI`
if self._remote_id:
self.logger.success(f'created remote {self.api.kind} with id {colored(self._remote_id, "cyan")}')

def run_forever(self):
self.api.log(self._remote_id, None)
# Streams log messages using websocket from remote server.
# Waits for an `asyncio.Event` to be set to exit out of streaming loop
self.api.log(remote_id=self._remote_id, event=self.exit_event)

def cancel(self):
# Indicates :meth:`run_forever` to exit by setting the `asyncio.Event`
self.exit_event.set()

def teardown(self):
# Closes the remote Pod/Pea using :class:`JinadAPI`
self.api.delete(remote_id=self._remote_id)

@cached_property
def _remote_id(self) -> Optional[str]:
Expand Down
58 changes: 29 additions & 29 deletions jina/peapods/runtimes/jinad/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import asyncio
from pathlib import Path
from contextlib import ExitStack
from multiprocessing.synchronize import Event
from multiprocessing import Event
from typing import Dict, Tuple, Set, List, Optional

from ....jaml import JAML
from ....logging import JinaLogger
from ....enums import RemotePeapodType
from ....excepts import RemotePodClosed
from ....importer import ImportExtensions


Expand Down Expand Up @@ -69,6 +68,7 @@ def fetch_files_from_yaml(pea_args: Dict, logger: 'JinaLogger') -> Tuple[Set[str

class JinadAPI:
kind = 'pea' # select from pea/pod, TODO: enum
TIMEOUT_ERROR_CODE = 4000

def __init__(self,
host: str,
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(self,
self.upload_url = f'{rest_url}/upload'
self.pea_url = f'{rest_url}/pea'
self.pod_url = f'{rest_url}/pod'
self.log_url = f'{websocket_url}/wslog'
self.log_url = f'{websocket_url}/logstream'

@property
def is_alive(self) -> bool:
Expand Down Expand Up @@ -162,61 +162,61 @@ def create(self, args: Dict, **kwargs) -> Optional[str]:
except requests.exceptions.RequestException as ex:
self.logger.error(f'couldn\'t create pod with remote jinad {repr(ex)}')

async def wslogs(self, remote_id: 'str', stop_event: Event, current_line: int = 0):
async def logstream(self, remote_id: 'str', event: Event):
""" websocket log stream from remote pea/pod
:param remote_id: the identity of that pea/pod
:param stop_event: the multiprocessing event which marks if stop event is set
:param current_line: the line number from which logs would be streamed
:param event: the multiprocessing event which marks if stop event is set
:return:
"""
with ImportExtensions(required=True):
import websockets

remote_loggers = {}
try:
# sleeping for few seconds to allow the logs to be written in remote
await asyncio.sleep(3)
async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=20') as websocket:
await websocket.send(json.dumps({'from': current_line}))
remote_loggers = {}
while True:
log_line = await websocket.recv()
if log_line:

async with websockets.connect(f'{self.log_url}/{remote_id}?timeout=5') as websocket:
current_line_number = -1

while not event.is_set():
self.logger.warning(f'fetching logs from line# {int(current_line_number) + 1}, event: {event}')
await websocket.send(json.dumps({'from': int(current_line_number) + 1}))
async for log_line in websocket:
try:
log_line = json.loads(log_line)
current_line = int(list(log_line.keys())[0])
log_line_dict = list(log_line.values())[0]
log_line_dict = json.loads(log_line_dict.split('\t')[-1].strip())
if 'code' in log_line and log_line['code'] == self.TIMEOUT_ERROR_CODE:
self.logger.info(f'Received timeout from the log server. Breaking')
break
current_line_number = list(log_line.keys())[0]
complete_log_message = log_line[current_line_number]
log_line_dict = json.loads(complete_log_message.split('\t')[-1].strip())
name = log_line_dict['name']
if name not in remote_loggers:
remote_loggers[name] = JinaLogger(context=f'🌏 {name}')
# TODO: change logging level, process name in local logger
# TODO(Deepankar): change logging level, process name in local logger
remote_loggers[name].info(f'{log_line_dict["message"].strip()}')
except json.decoder.JSONDecodeError:
continue
await websocket.send(json.dumps({}))
if stop_event.is_set():
for logger in remote_loggers.values():
logger.close()
raise RemotePodClosed
except websockets.exceptions.ConnectionClosedOK:
self.logger.debug(f'Client got disconnected from server')
return current_line
except websockets.exceptions.WebSocketException as e:
self.logger.error(f'Got following error while streaming logs via websocket {repr(e)}')
return 0
finally:
if remote_loggers:
for logger in remote_loggers.values():
logger.close()

def log(self, remote_id: 'str', stop_event: Event, **kwargs) -> None:
def log(self, remote_id: 'str', event: Event, **kwargs) -> None:
""" Start the log stream from remote pea/pod, will use local logger for output
:param remote_id: the identity of that pea/pod
:return:
"""
try:
self.logger.info(f'fetching streamed logs from remote id: {remote_id}')
asyncio.run(self.wslogs(remote_id=remote_id, stop_event=stop_event, current_line=0))
except RemotePodClosed:
self.logger.debug(f'🌏 remote closed')
self.logger.info(f'🌏 Fetching streamed logs from remote id: {remote_id}, event: {event}')
asyncio.run(self.logstream(remote_id=remote_id, event=event))
finally:
self.logger.info(f'🌏 exiting from remote logger')
self.logger.info(f'🌏 Exiting from remote logger')

def delete(self, remote_id: 'str', **kwargs) -> bool:
""" Delete a remote pea/pod
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/jinad/Dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ WORKDIR /jina/
ADD setup.py MANIFEST.in requirements.txt extra-requirements.txt README.md ./
ADD cli ./cli/
ADD jina ./jina/
ADD tests/integration/jinad/Dockerfiles/entrypoint.sh ./

ARG PIP_TAG=[test]

RUN chmod +x entrypoint.sh
RUN pip install ."$PIP_TAG"

# This doesn't start fluentd in the background
# add entrypoint script if fluentd needs to be enabled for tests
ENTRYPOINT ["jinad"]
# This starts both jinad & fluentd in the background
ENTRYPOINT ["bash", "-c", "./entrypoint.sh"]
12 changes: 12 additions & 0 deletions tests/integration/jinad/Dockerfiles/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

CONF_PATH=$(python3 -c "import pkg_resources; print(pkg_resources.resource_filename('jina', 'resources/fluent.conf'))")

# Start fluentd in the background
nohup fluentd -c $CONF_PATH &

# Allowing fluentd conf to load by sleeping for 2secs
sleep 2

# Start jinad (uvicorn) server
jinad
4 changes: 2 additions & 2 deletions tests/integration/jinad/test_index_query/test_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10
#Indexing part
Expand Down Expand Up @@ -43,7 +43,7 @@ echo "document matched has the text: ${TEXT_INDEXED}"
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d

docker-compose -f tests/integration/jinad/test_index_query_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10
#Indexing part
Expand Down Expand Up @@ -58,7 +59,7 @@ echo "found ${COUNT} matches"
curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"
curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_index_query/docker-compose.yml --project-directory . down --remove-orphans

if [ $COUNT = 10 ]; then
echo "Success"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -27,7 +27,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli

curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_distributed/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -27,7 +27,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli

curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_distributed_with_shards/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if [ "${PWD##*/}" != "jina" ]
exit 1
fi

docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d
docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . up --build -d --remove-orphans

sleep 10

Expand All @@ -30,7 +30,7 @@ curl -s --request GET "http://0.0.0.0:8000/v1/flow/${FLOW_ID}" -H "accept: appli

curl -s --request DELETE "http://0.0.0.0:8000/v1/flow?flow_id=${FLOW_ID}" -H "accept: application/json" | jq -e ".status_code"

docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down
docker-compose -f tests/integration/jinad/test_simple_hub_pods/docker-compose.yml --project-directory . down --remove-orphans

EXPECTED_TEXT='"text:hey, dude"'

Expand Down