Skip to content

Commit

Permalink
fix(jinad): ensure log is ready on creating flow (#2279)
Browse files Browse the repository at this point in the history
* fix(jinad): ensure log is ready on creating flow

* fix(jinad): implement logstream websocket in more idomatic way

* fix(jinad): simplify logstream and prevent duplicate messages

* feat(jinad): add `timeout` parameter to logstream

* fix(jinad): apply timeout to waiting for logfile too

* fix(jinad): re-user _log_is_ready

* test(jinad): cover logs endpoints with tests

* fix: undo waiting for log

* fix: missing dep & undefined name

* fix: remove unneeded import

* fix: add dockstrings to ConnectionMAnager class

* fix: add parameters to docstrings

* fix: docstrings
  • Loading branch information
mohamed--abdel-maksoud committed Apr 12, 2021
1 parent 2309da4 commit 2df2af5
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 40 deletions.
129 changes: 89 additions & 40 deletions daemon/api/endpoints/logs.py
Expand Up @@ -2,12 +2,15 @@
import json
import uuid
from pathlib import Path
from typing import List

from fastapi import APIRouter, WebSocket
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from fastapi.exceptions import HTTPException
from fastapi.responses import FileResponse
from starlette.endpoints import WebSocketEndpoint
from starlette.types import Receive, Scope, Send
from websockets import ConnectionClosedOK
from websockets.exceptions import ConnectionClosedError

from ... import daemon_logger, jinad_args
from ...stores.helper import get_workspace_path
Expand All @@ -27,64 +30,110 @@ async def _export_logs(workspace_id: uuid.UUID, log_id: uuid.UUID):
return FileResponse(filepath)


class LogStreamingEndpoint(WebSocketEndpoint):
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
super().__init__(scope, receive, send)
# Accessing path / query params from scope in ASGI
# https://asgi.readthedocs.io/en/latest/specs/www.html#websocket-connection-scope
info = self.scope.get('path').split('/')
workspace_id = info[-2]
log_id = info[-1]
self.filepath = get_workspace_path(workspace_id, log_id, 'logging.log')
self.active_clients = []
def _websocket_details(websocket: WebSocket):
return f'{websocket.client.host}:{websocket.client.port}'

async def on_connect(self, websocket: WebSocket) -> None:
await websocket.accept()

self.client_details = f'{websocket.client.host}:{websocket.client.port}'
self.active_clients.append(websocket)
daemon_logger.info(f'{self.client_details} is connected to stream logs!')
class ConnectionManager:
"""
Manager of websockets listening for a log stream.
TODO for now contian a single connection. Ideally there must be one
manager per log with a thread checking for updates in log and broadcasting
to active connections
"""

def __init__(self):
"""Instantiate a ConnectionManager."""
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
"""
Register a new websocket.
:param websocket: websocket to register
"""
await websocket.accept()
daemon_logger.info(
'%s is connected to stream logs!' % _websocket_details(websocket)
)
self.active_connections.append(websocket)

async def disconnect(self, websocket: WebSocket):
"""
Disconnect a websocket.
:param websocket: websocket to disconnect
"""
self.active_connections.remove(websocket)
await websocket.close()
daemon_logger.info('%s is disconnected' % _websocket_details(websocket))

async def broadcast(self, message: dict):
"""
Send a json message to all registered websockets.
:param message: JSON-serializable message to be broadcast
"""
daemon_logger.debug('connections: %r', self.active_connections)
for connection in self.active_connections:
try:
await connection.send_json(message)
except ConnectionClosedOK:
pass
except ConnectionClosedError:
await self.disconnect(connection)


@router.websocket('/logstream/{workspace_id}/{log_id}')
async def _logstream(
websocket: WebSocket, workspace_id: uuid.UUID, log_id: uuid.UUID, timeout: int = 0
):
manager = ConnectionManager()
await manager.connect(websocket)
client_details = _websocket_details(websocket)
filepath = get_workspace_path(workspace_id, log_id, 'logging.log')
try:
if jinad_args.no_fluentd:
daemon_logger.warning(
f'{self.client_details} asks for logstreaming but fluentd is not available'
f'{client_details} asks for logstreaming but fluentd is not available'
)
return

# on connection the fluentd file may not flushed (aka exist) yet
while not Path(self.filepath).is_file():
daemon_logger.debug(f'still waiting {self.filepath} to be ready...')
n = 0
while not Path(filepath).is_file():
daemon_logger.debug(f'still waiting {filepath} to be ready...')
await asyncio.sleep(1)
n += 1
if timeout > 0 and n >= timeout:
return

daemon_logger.success(f'{filepath} is ready for streaming')

with open(self.filepath) as fp:
with open(filepath) as fp:
fp.seek(0, 2)
daemon_logger.success(f'{self.filepath} is ready for streaming')
while websocket in self.active_clients:
delay = 0.1
n = 0
while True:
line = fp.readline() # also possible to read an empty line
if line:
daemon_logger.debug('sending line %s', line)
payload = None
try:
payload = json.loads(line)
except json.decoder.JSONDecodeError:
daemon_logger.warning(f'JSON decode error on {line}')

if payload:
from websockets import ConnectionClosedOK

try:
await websocket.send_json(payload)
except ConnectionClosedOK:
break
await manager.broadcast(payload)
n = 0
else:
await asyncio.sleep(0.1)

async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
self.active_clients.remove(websocket)
daemon_logger.info(f'{self.client_details} is disconnected')


# TODO: adding websocket in this way do not generate any docs
# see: https://github.com/tiangolo/fastapi/issues/1983
router.add_websocket_route(
path='/logstream/{workspace_id}/{log_id}', endpoint=LogStreamingEndpoint
)
await asyncio.sleep(delay)
n += 1
if timeout > 0 and n >= timeout / delay:
return
except WebSocketDisconnect:
await manager.disconnect(websocket)
finally:
await manager.disconnect(websocket)
79 changes: 79 additions & 0 deletions tests/daemon/unit/api/endpoints/test_logs.py
@@ -0,0 +1,79 @@
import os
import json
import time
from daemon.stores.helper import get_workspace_path
from daemon.api.endpoints import logs
from daemon import jinad_args


log_content = """
{"host":"ubuntu","process":"32539","type":"INFO","name":"encode1","uptime":"20210124215151","context":"encode1","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"starting jina.peapods.runtimes.zmq.zed.ZEDRuntime..."}
{"host":"ubuntu","process":"32539","type":"INFO","name":"encode1","uptime":"20210124215151","context":"encode1/ZEDRuntime","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"input \u001B[33mtcp://0.0.0.0:45319\u001B[0m (PULL_BIND) output \u001B[33mtcp://0.0.0.0:59229\u001B[0m (PUSH_CONNECT) control over \u001B[33mtcp://0.0.0.0:49571\u001B[0m (PAIR_BIND)"}
{"host":"ubuntu","process":"31612","type":"SUCCESS","name":"encode1","uptime":"20210124215151","context":"encode1","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"ready and listening"}
{"host":"ubuntu","process":"32546","type":"INFO","name":"encode2","uptime":"20210124215151","context":"encode2","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"starting jina.peapods.runtimes.zmq.zed.ZEDRuntime..."}
"""

workspace_id = 'acd2fa50-02a6-452d-add7-ff987810c741'
flow_id = 'f0f90a3d-d2d9-4646-96d5-2560886f6ef0'
nonexisting_id = 'fff90a3d-d2d9-4646-96d5-2560886ffff0'


def _write_to_logfile(content, append=False):
with open(
get_workspace_path(workspace_id, flow_id, 'logging.log'),
'a' if append else 'w+',
) as f:
f.writelines(content)


def setup_module():
print('setup', get_workspace_path(workspace_id, flow_id))
os.makedirs(get_workspace_path(workspace_id, flow_id), exist_ok=True)
_write_to_logfile(log_content)


def test_logs_invalid_workspace(fastapi_client):
response = fastapi_client.get(f'/logs/{nonexisting_id}/{flow_id}')
assert response.status_code == 404


def test_logs_invalid_flow(fastapi_client):
response = fastapi_client.get(f'/logs/{workspace_id}/{nonexisting_id}')
assert response.status_code == 404


def test_logs_wrong_order(fastapi_client):
response = fastapi_client.get(f'/logs/{flow_id}/{workspace_id}')
assert response.status_code == 404


def test_logs_correct_log(fastapi_client):
response = fastapi_client.get(f'/logs/{workspace_id}/{flow_id}')
assert response.status_code == 200
assert response.text == log_content


def test_logstream_missing(fastapi_client):
received = None
with fastapi_client.websocket_connect(
f'/logstream/{workspace_id}/{nonexisting_id}?timeout=3'
) as websocket:
try:
received = websocket.receive_json()
except Exception:
exception_raised = True
assert received is None
assert exception_raised


def test_logstream_valid(fastapi_client):
line = '{"host":"test-host","process":"12034","type":"INFO","name":"Flow","uptime":"2021-04-02T20:58:10.819138","context":"Flow","workspace_path":"...","log_id":"...","message":"1 Pods (i.e. 1 Peas) are running in this Flow"}\n'
received = None
with fastapi_client.websocket_connect(
f'/logstream/{workspace_id}/{flow_id}?timeout=3'
) as websocket:
time.sleep(0.25)
_write_to_logfile(line, True)
received = websocket.receive_json()
assert received is not None
assert received == json.loads(line)

0 comments on commit 2df2af5

Please sign in to comment.