From d3a0869c1b041d646b7dfe5919d0329c85db525e Mon Sep 17 00:00:00 2001 From: Zac Li Date: Mon, 22 May 2023 12:11:59 +0800 Subject: [PATCH 01/10] feat: unified duration and add counter --- lcserve/backend/gateway.py | 52 +++++++++------------ tests/integration/helper.py | 22 ++++++++- tests/integration/local/test_basic_app.py | 22 +++++++-- tests/integration/local/test_fastapi_app.py | 22 +++++++-- 4 files changed, 76 insertions(+), 42 deletions(-) diff --git a/lcserve/backend/gateway.py b/lcserve/backend/gateway.py index d04c0f8b..027345a7 100644 --- a/lcserve/backend/gateway.py +++ b/lcserve/backend/gateway.py @@ -343,23 +343,21 @@ def _setup_metrics(self): meter_provider=self.meter_provider, ) - self.http_duration_counter = self.meter.create_counter( - name="http_request_duration_seconds", - description="HTTP request duration in seconds", + self.duration_counter = self.meter.create_counter( + name="lcserve_request_duration_seconds", + description="Lc-serve Request duration in seconds", unit="s", ) - self.ws_duration_counter = self.meter.create_counter( - name="ws_request_duration_seconds", - description="WS request duration in seconds", - unit="s", + self.request_counter = self.meter.create_counter( + name="lcserve_request_count", + description="Lc-serve Request count", ) self.app.add_middleware( - MeasureDurationHTTPMiddleware, counter=self.http_duration_counter - ) - self.app.add_middleware( - MeasureDurationWebSocketMiddleware, counter=self.ws_duration_counter + MetricsMiddleware, + duration_counter=self.duration_counter, + request_counter=self.request_counter, ) def _register_healthz(self): @@ -1005,7 +1003,6 @@ async def send_duration_periodically( while True: await asyncio.sleep(self.interval) current_time = time.perf_counter() - duration = current_time - shared_data.last_reported_time if counter: counter.add( current_time - shared_data.last_reported_time, {"route": route} @@ -1014,13 +1011,16 @@ async def send_duration_periodically( shared_data.last_reported_time = current_time -class BaseMeasureDurationMiddleware: +class MetricsMiddleware: def __init__( - self, app: ASGIApp, scope_type: str, counter: Optional['Counter'] = None + self, + app: ASGIApp, + duration_counter: Optional['Counter'] = None, + request_counter: Optional['Counter'] = None, ): self.app = app - self.scope_type = scope_type - self.counter = counter + self.duration_counter = duration_counter + self.request_counter = request_counter # TODO: figure out solution for static assets self.skip_routes = [ '/docs', @@ -1033,32 +1033,24 @@ def __init__( ] async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: - if scope["type"] == self.scope_type and scope['path'] not in self.skip_routes: + if scope['path'] not in self.skip_routes: timer = Timer(5) shared_data = timer.SharedData(last_reported_time=time.perf_counter()) send_duration_task = asyncio.create_task( timer.send_duration_periodically( - shared_data, scope['path'], self.counter + shared_data, scope['path'], self.duration_counter ) ) try: await self.app(scope, receive, send) finally: send_duration_task.cancel() - if self.counter: - self.counter.add( + if self.duration_counter: + self.duration_counter.add( time.perf_counter() - shared_data.last_reported_time, {"route": scope['path']}, ) + if self.request_counter: + self.request_counter.add(1, {"route": scope['path']}) else: await self.app(scope, receive, send) - - -class MeasureDurationHTTPMiddleware(BaseMeasureDurationMiddleware): - def __init__(self, app: ASGIApp, counter: Optional['Counter'] = None): - super().__init__(app, "http", counter) - - -class MeasureDurationWebSocketMiddleware(BaseMeasureDurationMiddleware): - def __init__(self, app: ASGIApp, counter: Optional['Counter'] = None): - super().__init__(app, "websocket", counter) diff --git a/tests/integration/helper.py b/tests/integration/helper.py index 0120735e..945c6997 100644 --- a/tests/integration/helper.py +++ b/tests/integration/helper.py @@ -161,7 +161,7 @@ def get_values_from_prom(metrics, route): return duration_seconds -def examine_prom_with_retry(start_time, metrics, expected_value, route): +def examine_request_duration_with_retry(start_time, expected_value, route): timeout = 120 interval = 10 @@ -170,8 +170,26 @@ def examine_prom_with_retry(start_time, metrics, expected_value, route): if elapsed_time > timeout: pytest.fail("Timed out waiting for the Prometheus data to be populated") - duration_seconds = get_values_from_prom(metrics, route) + duration_seconds = get_values_from_prom( + "lcserve_request_duration_seconds", route + ) if round(float(duration_seconds)) == expected_value: break time.sleep(interval) + + +def examine_request_count_with_retry(start_time, expected_value, route): + timeout = 120 + interval = 10 + + while True: + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + pytest.fail("Timed out waiting for the Prometheus data to be populated") + + request_count = get_values_from_prom("lcserve_request_count", route) + if float(request_count) == expected_value: + break + + time.sleep(interval) diff --git a/tests/integration/local/test_basic_app.py b/tests/integration/local/test_basic_app.py index 5e3d18f6..8d213db4 100644 --- a/tests/integration/local/test_basic_app.py +++ b/tests/integration/local/test_basic_app.py @@ -6,7 +6,11 @@ import requests import websockets -from ..helper import examine_prom_with_retry, run_test_app_locally +from ..helper import ( + examine_request_count_with_retry, + examine_request_duration_with_retry, + run_test_app_locally, +) HOST = "localhost:8080" HTTP_HOST = f"http://{HOST}" @@ -257,12 +261,16 @@ def test_metrics_http(run_test_app_locally, route): assert response.status_code == 200 start_time = time.time() - examine_prom_with_retry( + examine_request_duration_with_retry( start_time, - metrics="http_request_duration_seconds", expected_value=5, route="/" + route, ) + examine_request_count_with_retry( + start_time, + expected_value=1, + route="/" + route, + ) @pytest.mark.asyncio @@ -283,9 +291,13 @@ async def test_metrics_ws(run_test_app_locally, route): assert received_messages == ["0", "1", "2", "3", "4"] start_time = time.time() - examine_prom_with_retry( + examine_request_duration_with_retry( start_time, - metrics="ws_request_duration_seconds", expected_value=5, route="/" + route, ) + examine_request_count_with_retry( + start_time, + expected_value=1, + route="/" + route, + ) diff --git a/tests/integration/local/test_fastapi_app.py b/tests/integration/local/test_fastapi_app.py index 9711c755..de379e49 100644 --- a/tests/integration/local/test_fastapi_app.py +++ b/tests/integration/local/test_fastapi_app.py @@ -7,7 +7,11 @@ import requests import websockets -from ..helper import examine_prom_with_retry, run_fastapi_app_locally +from ..helper import ( + examine_request_count_with_retry, + examine_request_duration_with_retry, + run_fastapi_app_locally, +) HOST = "localhost:8080" HTTP_HOST = f"http://{HOST}" @@ -82,12 +86,16 @@ def test_metrics_http(run_fastapi_app_locally, route): assert response.status_code == 200 start_time = time.time() - examine_prom_with_retry( + examine_request_duration_with_retry( start_time, - metrics="http_request_duration_seconds", expected_value=5, route="/" + route, ) + examine_request_count_with_retry( + start_time, + expected_value=1, + route="/" + route, + ) @pytest.mark.asyncio @@ -108,9 +116,13 @@ async def test_metrics_ws(run_fastapi_app_locally, route): assert received_messages == ["0", "1", "2", "3", "4"] start_time = time.time() - examine_prom_with_retry( + examine_request_duration_with_retry( start_time, - metrics="ws_request_duration_seconds", expected_value=5, route="/" + route, ) + examine_request_count_with_retry( + start_time, + expected_value=1, + route="/" + route, + ) From 4e25f9e6d8883333c0896f553162e63e45f5d721 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Mon, 22 May 2023 12:34:52 +0800 Subject: [PATCH 02/10] fix: add protocol metadata --- lcserve/backend/gateway.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lcserve/backend/gateway.py b/lcserve/backend/gateway.py index 027345a7..e60a14bd 100644 --- a/lcserve/backend/gateway.py +++ b/lcserve/backend/gateway.py @@ -998,14 +998,19 @@ def __init__(self, interval: int): self.interval = interval async def send_duration_periodically( - self, shared_data: SharedData, route: str, counter: Optional['Counter'] = None + self, + shared_data: SharedData, + route: str, + protocol: str, + counter: Optional['Counter'] = None, ): while True: await asyncio.sleep(self.interval) current_time = time.perf_counter() if counter: counter.add( - current_time - shared_data.last_reported_time, {"route": route} + current_time - shared_data.last_reported_time, + {'route': route, 'protocol': protocol}, ) shared_data.last_reported_time = current_time @@ -1038,7 +1043,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: shared_data = timer.SharedData(last_reported_time=time.perf_counter()) send_duration_task = asyncio.create_task( timer.send_duration_periodically( - shared_data, scope['path'], self.duration_counter + shared_data, scope['path'], scope['type'], self.duration_counter ) ) try: @@ -1048,9 +1053,11 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if self.duration_counter: self.duration_counter.add( time.perf_counter() - shared_data.last_reported_time, - {"route": scope['path']}, + {'route': scope['path'], 'protocol': scope['type']}, ) if self.request_counter: - self.request_counter.add(1, {"route": scope['path']}) + self.request_counter.add( + 1, {'route': scope['path'], 'protocol': scope['type']} + ) else: await self.app(scope, receive, send) From 311ee29d002d493c890b6b81776cd162be9ee86a Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 10:51:18 +0800 Subject: [PATCH 03/10] feat: logging middleware --- lcserve/backend/gateway.py | 89 +++++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/lcserve/backend/gateway.py b/lcserve/backend/gateway.py index e60a14bd..cbffa533 100644 --- a/lcserve/backend/gateway.py +++ b/lcserve/backend/gateway.py @@ -4,8 +4,8 @@ import shutil import sys import time +import uuid from enum import Enum -from functools import wraps from importlib import import_module from importlib.util import module_from_spec, spec_from_file_location from pathlib import Path @@ -290,6 +290,7 @@ def __init__( self._register_healthz() self._register_modules() self._setup_metrics() + self._setup_logging() @property def app(self) -> 'FastAPI': @@ -360,6 +361,9 @@ def _setup_metrics(self): request_counter=self.request_counter, ) + def _setup_logging(self): + self.app.add_middleware(LoggingMiddleware, logger=self.logger) + def _register_healthz(self): @self.app.get("/healthz") async def __healthz(): @@ -1061,3 +1065,86 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: ) else: await self.app(scope, receive, send) + + +class LoggingMiddleware: + def __init__(self, app: ASGIApp, logger: JinaLogger): + self.app = app + self.logger = logger + self.skip_routes = [ + '/docs', + '/redoc', + '/openapi.json', + '/healthz', + '/dry_run', + '/metrics', + '/favicon.ico', + ] + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope['path'] not in self.skip_routes: + # Get IP address, use X-Forwarded-For if set else use scope['client'][0] + ip_address = scope.get('client')[0] if scope.get('client') else None + if scope.get('headers'): + for header in scope['headers']: + if header[0].decode('latin-1') == 'x-forwarded-for': + ip_address = header[1].decode('latin-1') + break + + # Init the request/connection ID + request_id = connection_id = None + if scope["type"] == "http": + request_id = uuid.uuid4() + elif scope["type"] == "websocket": + connection_id = uuid.uuid4() + + original_send = send + is_accepted = False + status_code = None + start_time = time.perf_counter() + + async def custom_send(message: dict) -> None: + nonlocal is_accepted + nonlocal status_code + + if request_id and message.get('type') == 'http.response.start': + message.setdefault('headers', []).append( + (b'X-API-Request-ID', str(request_id).encode()) + ) + status_code = message.get('status') + print(status_code) + elif message["type"] == "websocket.accept": + is_accepted = True + + await original_send(message) + + # Ensure that the websocket.send message containing the connection ID is only sent once, + # and only after the websocket.accept message + if is_accepted and message.get('type') not in [ + 'websocket.send', + 'websocket.close', + ]: + await original_send( + { + "type": "websocket.send", + "text": f"connection_id:{connection_id}", + } + ) + is_accepted = False + + await self.app(scope, receive, custom_send) + + end_time = time.perf_counter() + duration = round(end_time - start_time, 2) + + if scope["type"] == "http": + self.logger.info( + f"HTTP response id: {request_id} - Path: {scope['path']} - Client IP: {ip_address} - Status code: {status_code} - Duration: {duration}" + ) + elif scope["type"] == "websocket": + self.logger.info( + f"WebSocket disconnection id: {connection_id} - Path: {scope['path']} - Client IP: {ip_address} - Duration: {duration}" + ) + + else: + await self.app(scope, receive, send) From 74c10657f41c6b2bfe89714742cd867498f3a7b2 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 11:06:04 +0800 Subject: [PATCH 04/10] test: fix --- tests/integration/jcloud/test_basic_app.py | 6 +++--- tests/integration/jcloud/test_fastapi_app.py | 9 ++++----- tests/integration/local/test_basic_app.py | 16 ++++++++-------- tests/integration/local/test_fastapi_app.py | 6 +++--- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/tests/integration/jcloud/test_basic_app.py b/tests/integration/jcloud/test_basic_app.py index c47dd1f5..f6cffa82 100644 --- a/tests/integration/jcloud/test_basic_app.py +++ b/tests/integration/jcloud/test_basic_app.py @@ -38,11 +38,11 @@ async def _test_ws_route(app_id): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(5): + for _ in range(6): message = await websocket.recv() received_messages.append(message) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] async def _test_workspace(app_id): @@ -63,6 +63,6 @@ async def _test_workspace(app_id): message = await websocket.recv() received_messages.append(message.strip()) - assert received_messages == [f"Here's string {i}" for i in range(10)] + assert received_messages[1:] == [f"Here's string {i}" for i in range(10)] except ConnectionClosedOK: pass diff --git a/tests/integration/jcloud/test_fastapi_app.py b/tests/integration/jcloud/test_fastapi_app.py index 74d0e9b7..8cf0bae6 100644 --- a/tests/integration/jcloud/test_fastapi_app.py +++ b/tests/integration/jcloud/test_fastapi_app.py @@ -1,11 +1,12 @@ import json +import aiohttp import pytest import requests -import aiohttp from ..helper import deploy_jcloud_fastapi_app + @pytest.mark.asyncio async def test_basic_app(): async with deploy_jcloud_fastapi_app() as app_id: @@ -18,9 +19,7 @@ def _test_http_route(app_id): "accept": "application/json", "Content-Type": "application/json", } - response = requests.get( - f"https://{app_id}.wolf.jina.ai/status", headers=headers - ) + response = requests.get(f"https://{app_id}.wolf.jina.ai/status", headers=headers) response_data = response.json() @@ -35,4 +34,4 @@ async def _test_ws_route(app_id): received_messages = [] async for message in websocket: received_messages.append(message.data) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] diff --git a/tests/integration/local/test_basic_app.py b/tests/integration/local/test_basic_app.py index a5c513cc..fa5db24c 100644 --- a/tests/integration/local/test_basic_app.py +++ b/tests/integration/local/test_basic_app.py @@ -47,11 +47,11 @@ async def test_basic_app_ws(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(5): + for _ in range(6): message = await websocket.recv() received_messages.append(message) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -136,11 +136,11 @@ async def test_basic_app_ws_authorized(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(5): + for _ in range(6): message = await websocket.recv() received_messages.append(message) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -284,11 +284,11 @@ async def test_metrics_ws(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(5): + for _ in range(6): message = await websocket.recv() received_messages.append(message) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] start_time = time.time() examine_request_duration_with_retry( @@ -322,8 +322,8 @@ async def test_workspace(run_test_app_locally): await websocket.send(json.dumps({})) received_messages = [] - for _ in range(10): + for _ in range(11): message = await websocket.recv() received_messages.append(message.strip()) - assert received_messages == [f"Here's string {i}" for i in range(10)] + assert received_messages[1:] == [f"Here's string {i}" for i in range(10)] diff --git a/tests/integration/local/test_fastapi_app.py b/tests/integration/local/test_fastapi_app.py index de379e49..c1162821 100644 --- a/tests/integration/local/test_fastapi_app.py +++ b/tests/integration/local/test_fastapi_app.py @@ -68,7 +68,7 @@ async def test_websocket_endpoint(run_fastapi_app_locally, route): received_messages = [] async for message in websocket: received_messages.append(message.data) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -109,11 +109,11 @@ async def test_metrics_ws(run_fastapi_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(5): + for _ in range(6): message = await websocket.recv() received_messages.append(message) - assert received_messages == ["0", "1", "2", "3", "4"] + assert received_messages[1:] == ["0", "1", "2", "3", "4"] start_time = time.time() examine_request_duration_with_retry( From edc461c5c8d849cdc415ce04f66236025e700009 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 11:40:48 +0800 Subject: [PATCH 05/10] build: upgrade python version to workaround langchain issue --- .github/workflows/ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4aad6400..f70a6396 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,10 +19,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2.5.0 - - name: Set up Python 3.9 + - name: Set up Python 3.10 uses: actions/setup-python@v4 with: - python-version: 3.9 + python-version: '3.10' - name: Lint with flake8 run: | pip install flake8 @@ -37,10 +37,10 @@ jobs: - uses: actions/checkout@v2.5.0 with: fetch-depth: 0 - - name: Set up Python 3.9 + - name: Set up Python 3.10 uses: actions/setup-python@v4 with: - python-version: 3.9 + python-version: '3.10' - id: file_changes uses: Ana06/get-changed-files@v1.2 - name: check black @@ -73,7 +73,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9] + python-version: ['3.10'] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix-unit)}} steps: - uses: actions/checkout@v2 @@ -102,7 +102,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9] + python-version: ['3.10'] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix-integration)}} steps: - uses: actions/checkout@v2 From d32f0a11958188fd36b5dc475e6c0e171af7adbe Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 11:50:03 +0800 Subject: [PATCH 06/10] build: upgrade python version to workaround langchain issue --- .github/workflows/jcloud-integration.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/jcloud-integration.yml b/.github/workflows/jcloud-integration.yml index 39546563..7cbb065f 100644 --- a/.github/workflows/jcloud-integration.yml +++ b/.github/workflows/jcloud-integration.yml @@ -30,7 +30,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.9] + python-version: ['3.10'] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix)}} steps: - uses: actions/checkout@v2 From 1efc1d950dd14a6de347ad6ee12ac1910cd4a4c1 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 12:28:25 +0800 Subject: [PATCH 07/10] build: revert py upgrade and pin problematic libs --- .github/workflows/ci.yml | 12 ++++++------ .github/workflows/jcloud-integration.yml | 2 +- requirements.txt | 2 ++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f70a6396..4aad6400 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,10 +19,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2.5.0 - - name: Set up Python 3.10 + - name: Set up Python 3.9 uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: 3.9 - name: Lint with flake8 run: | pip install flake8 @@ -37,10 +37,10 @@ jobs: - uses: actions/checkout@v2.5.0 with: fetch-depth: 0 - - name: Set up Python 3.10 + - name: Set up Python 3.9 uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: 3.9 - id: file_changes uses: Ana06/get-changed-files@v1.2 - name: check black @@ -73,7 +73,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.10'] + python-version: [3.9] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix-unit)}} steps: - uses: actions/checkout@v2 @@ -102,7 +102,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.10'] + python-version: [3.9] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix-integration)}} steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/jcloud-integration.yml b/.github/workflows/jcloud-integration.yml index 7cbb065f..39546563 100644 --- a/.github/workflows/jcloud-integration.yml +++ b/.github/workflows/jcloud-integration.yml @@ -30,7 +30,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.10'] + python-version: [3.9] test-path: ${{fromJson(needs.prep-testbed.outputs.matrix)}} steps: - uses: actions/checkout@v2 diff --git a/requirements.txt b/requirements.txt index e6a4bd3a..bf71c11e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ jina-hubble-sdk nest-asyncio textual toml +typing-inspect==0.8.0 +typing_extensions==4.5.0 From 85a313dc00ea8a12d617e7b23e3f971d9e143b75 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 12:30:55 +0800 Subject: [PATCH 08/10] build: revert py upgrade and pin problematic libs --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index bf71c11e..f546f709 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,7 @@ jina-hubble-sdk nest-asyncio textual toml +# Below libs caused issue https://github.com/hwchase17/langchain/issues/5113, +# can unpin once it's resolved typing-inspect==0.8.0 typing_extensions==4.5.0 From 9c69055b04dcba9b9498ac3ad20b302f7ba5fa6a Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 13:41:21 +0800 Subject: [PATCH 09/10] fix: undo connection client print --- .github/workflows/jcloud-integration.yml | 1 + lcserve/backend/gateway.py | 38 +++++--------------- tests/integration/jcloud/test_basic_app.py | 6 ++-- tests/integration/jcloud/test_fastapi_app.py | 2 +- tests/integration/local/test_basic_app.py | 16 ++++----- tests/integration/local/test_fastapi_app.py | 6 ++-- 6 files changed, 24 insertions(+), 45 deletions(-) diff --git a/.github/workflows/jcloud-integration.yml b/.github/workflows/jcloud-integration.yml index 39546563..41e63fac 100644 --- a/.github/workflows/jcloud-integration.yml +++ b/.github/workflows/jcloud-integration.yml @@ -36,6 +36,7 @@ jobs: - uses: actions/checkout@v2 with: fetch-depth: 0 + ref: ${{ github.event.inputs.branch }} - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: diff --git a/lcserve/backend/gateway.py b/lcserve/backend/gateway.py index 8fb52e3c..8ed0cfc7 100644 --- a/lcserve/backend/gateway.py +++ b/lcserve/backend/gateway.py @@ -1127,58 +1127,36 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: break # Init the request/connection ID - request_id = connection_id = None - if scope["type"] == "http": - request_id = uuid.uuid4() - elif scope["type"] == "websocket": - connection_id = uuid.uuid4() + request_id = str(uuid.uuid4()) if scope["type"] == "http" else None + connection_id = str(uuid.uuid4()) if scope["type"] == "websocket" else None - original_send = send - is_accepted = False status_code = None start_time = time.perf_counter() async def custom_send(message: dict) -> None: - nonlocal is_accepted nonlocal status_code + # TODO: figure out a way to do the same for ws if request_id and message.get('type') == 'http.response.start': message.setdefault('headers', []).append( (b'X-API-Request-ID', str(request_id).encode()) ) status_code = message.get('status') - print(status_code) - elif message["type"] == "websocket.accept": - is_accepted = True - - await original_send(message) - - # Ensure that the websocket.send message containing the connection ID is only sent once, - # and only after the websocket.accept message - if is_accepted and message.get('type') not in [ - 'websocket.send', - 'websocket.close', - ]: - await original_send( - { - "type": "websocket.send", - "text": f"connection_id:{connection_id}", - } - ) - is_accepted = False + + await send(message) await self.app(scope, receive, custom_send) end_time = time.perf_counter() - duration = round(end_time - start_time, 2) + duration = round(end_time - start_time, 3) if scope["type"] == "http": self.logger.info( - f"HTTP response id: {request_id} - Path: {scope['path']} - Client IP: {ip_address} - Status code: {status_code} - Duration: {duration}" + f"HTTP request: {request_id} - Path: {scope['path']} - Client IP: {ip_address} - Status code: {status_code} - Duration: {duration} s" ) elif scope["type"] == "websocket": self.logger.info( - f"WebSocket disconnection id: {connection_id} - Path: {scope['path']} - Client IP: {ip_address} - Duration: {duration}" + f"WebSocket connection: {connection_id} - Path: {scope['path']} - Client IP: {ip_address} - Duration: {duration}" ) else: diff --git a/tests/integration/jcloud/test_basic_app.py b/tests/integration/jcloud/test_basic_app.py index f6cffa82..c47dd1f5 100644 --- a/tests/integration/jcloud/test_basic_app.py +++ b/tests/integration/jcloud/test_basic_app.py @@ -38,11 +38,11 @@ async def _test_ws_route(app_id): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(6): + for _ in range(5): message = await websocket.recv() received_messages.append(message) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] async def _test_workspace(app_id): @@ -63,6 +63,6 @@ async def _test_workspace(app_id): message = await websocket.recv() received_messages.append(message.strip()) - assert received_messages[1:] == [f"Here's string {i}" for i in range(10)] + assert received_messages == [f"Here's string {i}" for i in range(10)] except ConnectionClosedOK: pass diff --git a/tests/integration/jcloud/test_fastapi_app.py b/tests/integration/jcloud/test_fastapi_app.py index 8cf0bae6..7a38b33a 100644 --- a/tests/integration/jcloud/test_fastapi_app.py +++ b/tests/integration/jcloud/test_fastapi_app.py @@ -34,4 +34,4 @@ async def _test_ws_route(app_id): received_messages = [] async for message in websocket: received_messages.append(message.data) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] diff --git a/tests/integration/local/test_basic_app.py b/tests/integration/local/test_basic_app.py index fa5db24c..a5c513cc 100644 --- a/tests/integration/local/test_basic_app.py +++ b/tests/integration/local/test_basic_app.py @@ -47,11 +47,11 @@ async def test_basic_app_ws(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(6): + for _ in range(5): message = await websocket.recv() received_messages.append(message) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -136,11 +136,11 @@ async def test_basic_app_ws_authorized(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(6): + for _ in range(5): message = await websocket.recv() received_messages.append(message) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -284,11 +284,11 @@ async def test_metrics_ws(run_test_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(6): + for _ in range(5): message = await websocket.recv() received_messages.append(message) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] start_time = time.time() examine_request_duration_with_retry( @@ -322,8 +322,8 @@ async def test_workspace(run_test_app_locally): await websocket.send(json.dumps({})) received_messages = [] - for _ in range(11): + for _ in range(10): message = await websocket.recv() received_messages.append(message.strip()) - assert received_messages[1:] == [f"Here's string {i}" for i in range(10)] + assert received_messages == [f"Here's string {i}" for i in range(10)] diff --git a/tests/integration/local/test_fastapi_app.py b/tests/integration/local/test_fastapi_app.py index c1162821..de379e49 100644 --- a/tests/integration/local/test_fastapi_app.py +++ b/tests/integration/local/test_fastapi_app.py @@ -68,7 +68,7 @@ async def test_websocket_endpoint(run_fastapi_app_locally, route): received_messages = [] async for message in websocket: received_messages.append(message.data) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] @pytest.mark.parametrize( @@ -109,11 +109,11 @@ async def test_metrics_ws(run_fastapi_app_locally, route): await websocket.send(json.dumps({"interval": 1})) received_messages = [] - for _ in range(6): + for _ in range(5): message = await websocket.recv() received_messages.append(message) - assert received_messages[1:] == ["0", "1", "2", "3", "4"] + assert received_messages == ["0", "1", "2", "3", "4"] start_time = time.time() examine_request_duration_with_retry( From a30d8a3e6ebdc48575fb2cc53fa1c4c5da6f15c1 Mon Sep 17 00:00:00 2001 From: Zac Li Date: Tue, 23 May 2023 14:02:17 +0800 Subject: [PATCH 10/10] fix: take first ip --- lcserve/backend/gateway.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lcserve/backend/gateway.py b/lcserve/backend/gateway.py index 8ed0cfc7..f3f7a1c3 100644 --- a/lcserve/backend/gateway.py +++ b/lcserve/backend/gateway.py @@ -1123,7 +1123,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope.get('headers'): for header in scope['headers']: if header[0].decode('latin-1') == 'x-forwarded-for': - ip_address = header[1].decode('latin-1') + ip_address = header[1].decode('latin-1').split(",")[0].strip() break # Init the request/connection ID @@ -1156,7 +1156,7 @@ async def custom_send(message: dict) -> None: ) elif scope["type"] == "websocket": self.logger.info( - f"WebSocket connection: {connection_id} - Path: {scope['path']} - Client IP: {ip_address} - Duration: {duration}" + f"WebSocket connection: {connection_id} - Path: {scope['path']} - Client IP: {ip_address} - Duration: {duration} s" ) else: