diff --git a/daemon/api/endpoints/logs.py b/daemon/api/endpoints/logs.py index 611fff0a26c5c..d3ccb2e92f7b9 100644 --- a/daemon/api/endpoints/logs.py +++ b/daemon/api/endpoints/logs.py @@ -2,12 +2,15 @@ import json import uuid from pathlib import Path +from typing import List -from fastapi import APIRouter, WebSocket +from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi.exceptions import HTTPException from fastapi.responses import FileResponse from starlette.endpoints import WebSocketEndpoint from starlette.types import Receive, Scope, Send +from websockets import ConnectionClosedOK +from websockets.exceptions import ConnectionClosedError from ... import daemon_logger, jinad_args from ...stores.helper import get_workspace_path @@ -27,41 +30,95 @@ async def _export_logs(workspace_id: uuid.UUID, log_id: uuid.UUID): return FileResponse(filepath) -class LogStreamingEndpoint(WebSocketEndpoint): - def __init__(self, scope: Scope, receive: Receive, send: Send) -> None: - super().__init__(scope, receive, send) - # Accessing path / query params from scope in ASGI - # https://asgi.readthedocs.io/en/latest/specs/www.html#websocket-connection-scope - info = self.scope.get('path').split('/') - workspace_id = info[-2] - log_id = info[-1] - self.filepath = get_workspace_path(workspace_id, log_id, 'logging.log') - self.active_clients = [] +def _websocket_details(websocket: WebSocket): + return f'{websocket.client.host}:{websocket.client.port}' - async def on_connect(self, websocket: WebSocket) -> None: - await websocket.accept() - self.client_details = f'{websocket.client.host}:{websocket.client.port}' - self.active_clients.append(websocket) - daemon_logger.info(f'{self.client_details} is connected to stream logs!') +class ConnectionManager: + """ + Manager of websockets listening for a log stream. + + TODO for now contian a single connection. Ideally there must be one + manager per log with a thread checking for updates in log and broadcasting + to active connections + """ + + def __init__(self): + """Instantiate a ConnectionManager.""" + self.active_connections: List[WebSocket] = [] + async def connect(self, websocket: WebSocket): + """ + Register a new websocket. + + :param websocket: websocket to register + """ + await websocket.accept() + daemon_logger.info( + '%s is connected to stream logs!' % _websocket_details(websocket) + ) + self.active_connections.append(websocket) + + async def disconnect(self, websocket: WebSocket): + """ + Disconnect a websocket. + + :param websocket: websocket to disconnect + """ + self.active_connections.remove(websocket) + await websocket.close() + daemon_logger.info('%s is disconnected' % _websocket_details(websocket)) + + async def broadcast(self, message: dict): + """ + Send a json message to all registered websockets. + + :param message: JSON-serializable message to be broadcast + """ + daemon_logger.debug('connections: %r', self.active_connections) + for connection in self.active_connections: + try: + await connection.send_json(message) + except ConnectionClosedOK: + pass + except ConnectionClosedError: + await self.disconnect(connection) + + +@router.websocket('/logstream/{workspace_id}/{log_id}') +async def _logstream( + websocket: WebSocket, workspace_id: uuid.UUID, log_id: uuid.UUID, timeout: int = 0 +): + manager = ConnectionManager() + await manager.connect(websocket) + client_details = _websocket_details(websocket) + filepath = get_workspace_path(workspace_id, log_id, 'logging.log') + try: if jinad_args.no_fluentd: daemon_logger.warning( - f'{self.client_details} asks for logstreaming but fluentd is not available' + f'{client_details} asks for logstreaming but fluentd is not available' ) return # on connection the fluentd file may not flushed (aka exist) yet - while not Path(self.filepath).is_file(): - daemon_logger.debug(f'still waiting {self.filepath} to be ready...') + n = 0 + while not Path(filepath).is_file(): + daemon_logger.debug(f'still waiting {filepath} to be ready...') await asyncio.sleep(1) + n += 1 + if timeout > 0 and n >= timeout: + return + + daemon_logger.success(f'{filepath} is ready for streaming') - with open(self.filepath) as fp: + with open(filepath) as fp: fp.seek(0, 2) - daemon_logger.success(f'{self.filepath} is ready for streaming') - while websocket in self.active_clients: + delay = 0.1 + n = 0 + while True: line = fp.readline() # also possible to read an empty line if line: + daemon_logger.debug('sending line %s', line) payload = None try: payload = json.loads(line) @@ -69,22 +126,14 @@ async def on_connect(self, websocket: WebSocket) -> None: daemon_logger.warning(f'JSON decode error on {line}') if payload: - from websockets import ConnectionClosedOK - - try: - await websocket.send_json(payload) - except ConnectionClosedOK: - break + await manager.broadcast(payload) + n = 0 else: - await asyncio.sleep(0.1) - - async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None: - self.active_clients.remove(websocket) - daemon_logger.info(f'{self.client_details} is disconnected') - - -# TODO: adding websocket in this way do not generate any docs -# see: https://github.com/tiangolo/fastapi/issues/1983 -router.add_websocket_route( - path='/logstream/{workspace_id}/{log_id}', endpoint=LogStreamingEndpoint -) + await asyncio.sleep(delay) + n += 1 + if timeout > 0 and n >= timeout / delay: + return + except WebSocketDisconnect: + await manager.disconnect(websocket) + finally: + await manager.disconnect(websocket) diff --git a/tests/daemon/unit/api/endpoints/test_logs.py b/tests/daemon/unit/api/endpoints/test_logs.py new file mode 100644 index 0000000000000..5503560813189 --- /dev/null +++ b/tests/daemon/unit/api/endpoints/test_logs.py @@ -0,0 +1,79 @@ +import os +import json +import time +from daemon.stores.helper import get_workspace_path +from daemon.api.endpoints import logs +from daemon import jinad_args + + +log_content = """ +{"host":"ubuntu","process":"32539","type":"INFO","name":"encode1","uptime":"20210124215151","context":"encode1","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"starting jina.peapods.runtimes.zmq.zed.ZEDRuntime..."} +{"host":"ubuntu","process":"32539","type":"INFO","name":"encode1","uptime":"20210124215151","context":"encode1/ZEDRuntime","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"input \u001B[33mtcp://0.0.0.0:45319\u001B[0m (PULL_BIND) output \u001B[33mtcp://0.0.0.0:59229\u001B[0m (PUSH_CONNECT) control over \u001B[33mtcp://0.0.0.0:49571\u001B[0m (PAIR_BIND)"} +{"host":"ubuntu","process":"31612","type":"SUCCESS","name":"encode1","uptime":"20210124215151","context":"encode1","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"ready and listening"} +{"host":"ubuntu","process":"32546","type":"INFO","name":"encode2","uptime":"20210124215151","context":"encode2","workspace_path":"/tmp/jinad/32aa7734-fbb8-4e7a-9f76-46221b512648","log_id":"16ef0bd7-e534-42e7-9076-87a3f585933c","message":"starting jina.peapods.runtimes.zmq.zed.ZEDRuntime..."} +""" + +workspace_id = 'acd2fa50-02a6-452d-add7-ff987810c741' +flow_id = 'f0f90a3d-d2d9-4646-96d5-2560886f6ef0' +nonexisting_id = 'fff90a3d-d2d9-4646-96d5-2560886ffff0' + + +def _write_to_logfile(content, append=False): + with open( + get_workspace_path(workspace_id, flow_id, 'logging.log'), + 'a' if append else 'w+', + ) as f: + f.writelines(content) + + +def setup_module(): + print('setup', get_workspace_path(workspace_id, flow_id)) + os.makedirs(get_workspace_path(workspace_id, flow_id), exist_ok=True) + _write_to_logfile(log_content) + + +def test_logs_invalid_workspace(fastapi_client): + response = fastapi_client.get(f'/logs/{nonexisting_id}/{flow_id}') + assert response.status_code == 404 + + +def test_logs_invalid_flow(fastapi_client): + response = fastapi_client.get(f'/logs/{workspace_id}/{nonexisting_id}') + assert response.status_code == 404 + + +def test_logs_wrong_order(fastapi_client): + response = fastapi_client.get(f'/logs/{flow_id}/{workspace_id}') + assert response.status_code == 404 + + +def test_logs_correct_log(fastapi_client): + response = fastapi_client.get(f'/logs/{workspace_id}/{flow_id}') + assert response.status_code == 200 + assert response.text == log_content + + +def test_logstream_missing(fastapi_client): + received = None + with fastapi_client.websocket_connect( + f'/logstream/{workspace_id}/{nonexisting_id}?timeout=3' + ) as websocket: + try: + received = websocket.receive_json() + except Exception: + exception_raised = True + assert received is None + assert exception_raised + + +def test_logstream_valid(fastapi_client): + line = '{"host":"test-host","process":"12034","type":"INFO","name":"Flow","uptime":"2021-04-02T20:58:10.819138","context":"Flow","workspace_path":"...","log_id":"...","message":"1 Pods (i.e. 1 Peas) are running in this Flow"}\n' + received = None + with fastapi_client.websocket_connect( + f'/logstream/{workspace_id}/{flow_id}?timeout=3' + ) as websocket: + time.sleep(0.25) + _write_to_logfile(line, True) + received = websocket.receive_json() + assert received is not None + assert received == json.loads(line)