diff --git a/docs/fundamentals/flow/executor-args.md b/docs/fundamentals/flow/executor-args.md index 5dbdbf41382cb..663d4c1966a56 100644 --- a/docs/fundamentals/flow/executor-args.md +++ b/docs/fundamentals/flow/executor-args.md @@ -35,6 +35,12 @@ | `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `string` | `random in [49152, 65535]` | | `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` | | `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` | +| `tracing` | If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. | `boolean` | `False` | +| `traces_exporter_host` | If tracing is enabled, this hostname will be used to configure the trace exporter agent. | `string` | `None` | +| `traces_exporter_port` | If tracing is enabled, this port will be used to configure the trace exporter agent. | `number` | `None` | +| `metrics` | If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. | `boolean` | `False` | +| `metrics_exporter_host` | If tracing is enabled, this hostname will be used to configure the metrics exporter agent. | `string` | `None` | +| `metrics_exporter_port` | If tracing is enabled, this port will be used to configure the metrics exporter agent. | `number` | `None` | | `install_requirements` | If set, install `requirements.txt` in the Hub Executor bundle to local | `boolean` | `False` | | `force_update` | If set, always pull the latest Hub Executor bundle even it exists on local | `boolean` | `False` | | `compression` | The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. | `string` | `None` | diff --git a/docs/fundamentals/flow/gateway-args.md b/docs/fundamentals/flow/gateway-args.md index 285a9ed5c5170..999129acfc4d7 100644 --- a/docs/fundamentals/flow/gateway-args.md +++ b/docs/fundamentals/flow/gateway-args.md @@ -49,4 +49,10 @@ | `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` | | `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `string` | `random in [49152, 65535]` | | `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` | -| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` | \ No newline at end of file +| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` | +| `tracing` | If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. | `boolean` | `False` | +| `traces_exporter_host` | If tracing is enabled, this hostname will be used to configure the trace exporter agent. | `string` | `None` | +| `traces_exporter_port` | If tracing is enabled, this port will be used to configure the trace exporter agent. | `number` | `None` | +| `metrics` | If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. | `boolean` | `False` | +| `metrics_exporter_host` | If tracing is enabled, this hostname will be used to configure the metrics exporter agent. | `string` | `None` | +| `metrics_exporter_port` | If tracing is enabled, this port will be used to configure the metrics exporter agent. | `number` | `None` | \ No newline at end of file diff --git a/extra-requirements.txt b/extra-requirements.txt index 7ca4cb34e4e80..f803a5b3fdbe8 100644 --- a/extra-requirements.txt +++ b/extra-requirements.txt @@ -35,8 +35,17 @@ packaging>=20.0: core docarray>=0.16.4: core jina-hubble-sdk>=0.19.0: core jcloud>=0.0.35: core +opentelemetry-api>=1.12.0: core +opentelemetry-instrumentation-grpc>=0.33b0: core uvloop: perf,standard,devel prometheus_client: perf,standard,devel +opentelemetry-sdk>=1.12.0: perf,standard,devel +opentelemetry-exporter-otlp>=1.12.0: perf,standard,devel +opentelemetry-exporter-prometheus>=1.12.0rc1: perf,standard,devel +opentelemetry-semantic-conventions>=0.33b0: perf,standard,devel +opentelemetry-instrumentation-aiohttp-client>=0.33b0: perf,standard,devel +opentelemetry-instrumentation-fastapi>=0.33b0: perf,standard,devel +opentelemetry-exporter-otlp-proto-grpc>=1.13.0: perf,standrad,devel fastapi>=0.76.0: standard,devel uvicorn[standard]: standard,devel docarray[common]>=0.16.3: standard,devel @@ -77,3 +86,4 @@ bs4: cicd jsonschema: cicd portforward>=0.2.4: cicd tensorflow>=2.0: cicd +opentelemetry-test-utils>=0.33b0: test diff --git a/jina/clients/__init__.py b/jina/clients/__init__.py index d994468c4d801..c1451d94f35ec 100644 --- a/jina/clients/__init__.py +++ b/jina/clients/__init__.py @@ -20,10 +20,16 @@ def Client( *, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = None, + metrics_exporter_port: Optional[int] = None, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, + traces_exporter_host: Optional[str] = None, + traces_exporter_port: Optional[int] = None, + tracing: Optional[bool] = False, **kwargs ) -> Union[ 'AsyncWebSocketClient', @@ -37,10 +43,16 @@ def Client( :param asyncio: If set, then the input and output of this Client work in an asynchronous manner. :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param tls: If set, connect to gateway using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :return: the new Client object .. # noqa: DAR202 @@ -81,10 +93,16 @@ def Client( :param asyncio: If set, then the input and output of this Client work in an asynchronous manner. :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param tls: If set, connect to gateway using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :return: the new Client object .. # noqa: DAR102 diff --git a/jina/clients/base/__init__.py b/jina/clients/base/__init__.py index 90c1894e27b01..b5c3cce8a9c73 100644 --- a/jina/clients/base/__init__.py +++ b/jina/clients/base/__init__.py @@ -17,9 +17,10 @@ InputType = Union[GeneratorSourceType, Callable[..., GeneratorSourceType]] CallbackFnType = Optional[Callable[[Response], None]] +from jina.serve.instrumentation import InstrumentationMixin -class BaseClient(ABC): +class BaseClient(InstrumentationMixin, ABC): """A base client for connecting to the Flow Gateway. :param args: the Namespace from argparse @@ -46,6 +47,17 @@ def __init__( os.unsetenv('http_proxy') os.unsetenv('https_proxy') self._inputs = None + self._setup_instrumentation( + name=self.args.name + if hasattr(self.args, 'name') + else self.__class__.__name__, + tracing=self.args.tracing, + traces_exporter_host=self.args.traces_exporter_host, + traces_exporter_port=self.args.traces_exporter_port, + metrics=self.args.metrics, + metrics_exporter_host=self.args.metrics_exporter_host, + metrics_exporter_port=self.args.metrics_exporter_port, + ) send_telemetry_event(event='start', obj=self) @staticmethod diff --git a/jina/clients/base/grpc.py b/jina/clients/base/grpc.py index 48468ba420e0c..40e687999ed75 100644 --- a/jina/clients/base/grpc.py +++ b/jina/clients/base/grpc.py @@ -1,8 +1,8 @@ import asyncio +import json from typing import TYPE_CHECKING, Optional import grpc -import json from jina.clients.base import BaseClient from jina.clients.helper import callback_exec @@ -82,19 +82,27 @@ async def _get_results( # while loop with retries, check in which state the `iterator` remains after failure options = GrpcConnectionPool.get_default_grpc_options() if max_attempts > 1: - service_config_json = json.dumps({ - "methodConfig": [{ - # To apply retry to all methods, put [{}] in the "name" field - "name": [{}], - "retryPolicy": { - "maxAttempts": max_attempts, - "initialBackoff": f"{initial_backoff}s", - "maxBackoff": f"{max_backoff}s", - "backoffMultiplier": {backoff_multiplier}, - "retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED", "INTERNAL"], - }, - }] - }) + service_config_json = json.dumps( + { + "methodConfig": [ + { + # To apply retry to all methods, put [{}] in the "name" field + "name": [{}], + "retryPolicy": { + "maxAttempts": max_attempts, + "initialBackoff": f"{initial_backoff}s", + "maxBackoff": f"{max_backoff}s", + "backoffMultiplier": {backoff_multiplier}, + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + "INTERNAL", + ], + }, + } + ] + } + ) # NOTE: the retry feature will be enabled by default >=v1.40.0 options.append(("grpc.enable_retries", 1)) options.append(("grpc.service_config", service_config_json)) @@ -104,6 +112,7 @@ async def _get_results( options=options, asyncio=True, tls=self.args.tls, + aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(), ) as channel: stub = jina_pb2_grpc.JinaRPCStub(channel) self.logger.debug(f'connected to {self.args.host}:{self.args.port}') @@ -146,11 +155,13 @@ async def _get_results( ) raise ConnectionError(my_details) elif my_code == grpc.StatusCode.INTERNAL: - self.logger.error(f'{msg}\ninternal error on the server side') + self.logger.error( + f'{msg}\ninternal error on the server side' + ) raise err elif ( - my_code == grpc.StatusCode.UNKNOWN - and 'asyncio.exceptions.TimeoutError' in my_details + my_code == grpc.StatusCode.UNKNOWN + and 'asyncio.exceptions.TimeoutError' in my_details ): raise BadClientInput( f'{msg}\n' diff --git a/jina/clients/base/helper.py b/jina/clients/base/helper.py index 4ee9749dd4933..b408ddcc60045 100644 --- a/jina/clients/base/helper.py +++ b/jina/clients/base/helper.py @@ -1,7 +1,7 @@ import asyncio import random from abc import ABC, abstractmethod -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from aiohttp import WSMsgType @@ -12,19 +12,25 @@ from jina.types.request.status import StatusMessage if TYPE_CHECKING: + from opentelemetry import trace + from jina.logging.logger import JinaLogger class AioHttpClientlet(ABC): """aiohttp session manager""" - def __init__(self, url: str, - logger: 'JinaLogger', - max_attempts: int = 1, - initial_backoff: float = 0.5, - max_backoff: float = 0.1, - backoff_multiplier: float = 1.5, - **kwargs) -> None: + def __init__( + self, + url: str, + logger: 'JinaLogger', + max_attempts: int = 1, + initial_backoff: float = 0.5, + max_backoff: float = 0.1, + backoff_multiplier: float = 1.5, + tracer_provider: Optional['trace.TraceProvider'] = None, + **kwargs, + ) -> None: """HTTP Client to be used with the streamer :param url: url to send http/websocket request to @@ -33,12 +39,19 @@ def __init__(self, url: str, :param initial_backoff: The first retry will happen with a delay of random(0, initial_backoff) :param max_backoff: The maximum accepted backoff after the exponential incremental delay :param backoff_multiplier: The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)) + :param tracer_provider: Optional tracer_provider that will be used to configure aiohttp tracing. :param kwargs: kwargs which will be forwarded to the `aiohttp.Session` instance. Used to pass headers to requests """ self.url = url self.logger = logger self.msg_recv = 0 self.msg_sent = 0 + if tracer_provider: + from opentelemetry.instrumentation.aiohttp_client import create_trace_config + + self._trace_config = [create_trace_config(tracer_provider=tracer_provider)] + else: + self._trace_config = None self.session = None self._session_kwargs = {} if kwargs.get('headers', None): @@ -90,7 +103,9 @@ async def start(self): with ImportExtensions(required=True): import aiohttp - self.session = aiohttp.ClientSession(**self._session_kwargs) + self.session = aiohttp.ClientSession( + **self._session_kwargs, trace_configs=self._trace_config + ) await self.session.__aenter__() return self @@ -125,7 +140,14 @@ async def send_message(self, request: 'Request'): if retry == self.max_attempts: raise else: - wait_time = random.uniform(0, min(self.initial_backoff*self.backoff_multiplier**(retry-1), self.max_backoff)) + wait_time = random.uniform( + 0, + min( + self.initial_backoff + * self.backoff_multiplier ** (retry - 1), + self.max_backoff, + ), + ) await asyncio.sleep(wait_time) async def send_dry_run(self, **kwargs): @@ -194,7 +216,14 @@ async def send_message(self, request: 'Request'): self.logger.critical(f'server connection closed already!') raise else: - wait_time = random.uniform(0, min(self.initial_backoff*self.backoff_multiplier**(retry-1), self.max_backoff)) + wait_time = random.uniform( + 0, + min( + self.initial_backoff + * self.backoff_multiplier ** (retry - 1), + self.max_backoff, + ), + ) await asyncio.sleep(wait_time) async def send_dry_run(self, **kwargs): diff --git a/jina/clients/base/http.py b/jina/clients/base/http.py index b78c033d0716a..dfbd8c3f39d61 100644 --- a/jina/clients/base/http.py +++ b/jina/clients/base/http.py @@ -25,19 +25,19 @@ def _handle_response_status(self, r_status, r_str, url): if r_status == status.HTTP_404_NOT_FOUND: raise BadClient(f'no such endpoint {url}') elif ( - r_status == status.HTTP_503_SERVICE_UNAVAILABLE - or r_status == status.HTTP_504_GATEWAY_TIMEOUT + r_status == status.HTTP_503_SERVICE_UNAVAILABLE + or r_status == status.HTTP_504_GATEWAY_TIMEOUT ): if ( - 'header' in r_str - and 'status' in r_str['header'] - and 'description' in r_str['header']['status'] + 'header' in r_str + and 'status' in r_str['header'] + and 'description' in r_str['header']['status'] ): raise ConnectionError(r_str['header']['status']['description']) else: raise ValueError(r_str) elif ( - r_status < status.HTTP_200_OK or r_status > status.HTTP_300_MULTIPLE_CHOICES + r_status < status.HTTP_200_OK or r_status > status.HTTP_300_MULTIPLE_CHOICES ): # failure codes raise ValueError(r_str) @@ -54,7 +54,12 @@ async def _is_flow_ready(self, **kwargs) -> bool: proto = 'https' if self.args.tls else 'http' url = f'{proto}://{self.args.host}:{self.args.port}/dry_run' iolet = await stack.enter_async_context( - HTTPClientlet(url=url, logger=self.logger, **kwargs) + HTTPClientlet( + url=url, + logger=self.logger, + tracer_provider=self.tracer_provider, + **kwargs, + ) ) response = await iolet.send_dry_run(**kwargs) @@ -75,16 +80,16 @@ async def _is_flow_ready(self, **kwargs) -> bool: return False async def _get_results( - self, - inputs: 'InputType', - on_done: 'CallbackFnType', - on_error: Optional['CallbackFnType'] = None, - on_always: Optional['CallbackFnType'] = None, - max_attempts: int = 1, - initial_backoff: float = 0.5, - max_backoff: float = 0.1, - backoff_multiplier: float = 1.5, - **kwargs, + self, + inputs: 'InputType', + on_done: 'CallbackFnType', + on_error: Optional['CallbackFnType'] = None, + on_always: Optional['CallbackFnType'] = None, + max_attempts: int = 1, + initial_backoff: float = 0.5, + max_backoff: float = 0.1, + backoff_multiplier: float = 1.5, + **kwargs, ): """ :param inputs: the callable @@ -113,12 +118,20 @@ async def _get_results( proto = 'https' if self.args.tls else 'http' url = f'{proto}://{self.args.host}:{self.args.port}/post' iolet = await stack.enter_async_context( - HTTPClientlet(url=url, logger=self.logger, max_attempts=max_attempts, initial_backoff=initial_backoff, - max_backoff=max_backoff, backoff_multiplier=backoff_multiplier, **kwargs) + HTTPClientlet( + url=url, + logger=self.logger, + tracer_provider=self.tracer_provider, + max_attempts=max_attempts, + initial_backoff=initial_backoff, + max_backoff=max_backoff, + backoff_multiplier=backoff_multiplier, + **kwargs, + ) ) def _request_handler( - request: 'Request', + request: 'Request', ) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]': """ For HTTP Client, for each request in the iterator, we `send_message` using diff --git a/jina/clients/base/websocket.py b/jina/clients/base/websocket.py index 2f56d3f49fbbc..2ed4991d7f2cc 100644 --- a/jina/clients/base/websocket.py +++ b/jina/clients/base/websocket.py @@ -33,7 +33,12 @@ async def _is_flow_ready(self, **kwargs) -> bool: proto = 'wss' if self.args.tls else 'ws' url = f'{proto}://{self.args.host}:{self.args.port}/dry_run' iolet = await stack.enter_async_context( - WebsocketClientlet(url=url, logger=self.logger, **kwargs) + WebsocketClientlet( + url=url, + logger=self.logger, + tracer_provider=self.tracer_provider, + **kwargs, + ) ) async def _receive(): @@ -72,16 +77,16 @@ async def _send(): return False async def _get_results( - self, - inputs: 'InputType', - on_done: 'CallbackFnType', - on_error: Optional['CallbackFnType'] = None, - on_always: Optional['CallbackFnType'] = None, - max_attempts: int = 1, - initial_backoff: float = 0.5, - max_backoff: float = 0.1, - backoff_multiplier: float = 1.5, - **kwargs, + self, + inputs: 'InputType', + on_done: 'CallbackFnType', + on_error: Optional['CallbackFnType'] = None, + on_always: Optional['CallbackFnType'] = None, + max_attempts: int = 1, + initial_backoff: float = 0.5, + max_backoff: float = 0.1, + backoff_multiplier: float = 1.5, + **kwargs, ): """ :param inputs: the callable @@ -110,9 +115,16 @@ async def _get_results( proto = 'wss' if self.args.tls else 'ws' url = f'{proto}://{self.args.host}:{self.args.port}/' iolet = await stack.enter_async_context( - WebsocketClientlet(url=url, logger=self.logger, max_attempts=max_attempts, - initial_backoff=initial_backoff, - max_backoff=max_backoff, backoff_multiplier=backoff_multiplier, **kwargs) + WebsocketClientlet( + url=url, + logger=self.logger, + tracer_provider=self.tracer_provider, + max_attempts=max_attempts, + initial_backoff=initial_backoff, + max_backoff=max_backoff, + backoff_multiplier=backoff_multiplier, + **kwargs, + ) ) request_buffer: Dict[ @@ -150,7 +162,7 @@ def _handle_end_of_iter(): asyncio.create_task(iolet.send_eoi()) def _request_handler( - request: 'Request', + request: 'Request', ) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]': """ For each request in the iterator, we send the `Message` using `iolet.send_message()`. diff --git a/jina/orchestrate/flow/base.py b/jina/orchestrate/flow/base.py index 8bce7012513d8..109a2722e3843 100644 --- a/jina/orchestrate/flow/base.py +++ b/jina/orchestrate/flow/base.py @@ -118,20 +118,32 @@ def __init__( *, asyncio: Optional[bool] = False, host: Optional[str] = '0.0.0.0', + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = None, + metrics_exporter_port: Optional[int] = None, port: Optional[int] = None, protocol: Optional[str] = 'GRPC', proxy: Optional[bool] = False, tls: Optional[bool] = False, + traces_exporter_host: Optional[str] = None, + traces_exporter_port: Optional[int] = None, + tracing: Optional[bool] = False, **kwargs, ): """Create a Flow. Flow is how Jina streamlines and scales Executors. This overloaded method provides arguments from `jina client` CLI. :param asyncio: If set, then the input and output of this Client work in an asynchronous manner. :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param tls: If set, connect to gateway using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. .. # noqa: DAR202 .. # noqa: DAR101 @@ -165,6 +177,9 @@ def __init__( host: Optional[str] = '0.0.0.0', host_in: Optional[str] = '0.0.0.0', log_config: Optional[str] = None, + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = None, + metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = 'gateway', native: Optional[bool] = False, @@ -190,6 +205,9 @@ def __init__( timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, title: Optional[str] = None, + traces_exporter_host: Optional[str] = None, + traces_exporter_port: Optional[int] = None, + tracing: Optional[bool] = False, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = None, uses_with: Optional[dict] = None, uvicorn_kwargs: Optional[dict] = None, @@ -229,6 +247,9 @@ def __init__( :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param monitoring: If set, spawn an http server with a prometheus endpoint to expose metrics :param name: The name of this object. @@ -281,6 +302,9 @@ def __init__( :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param timeout_send: The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default :param title: The title of this HTTP server. It will be used in automatics docs such as Swagger UI. + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :param uses: The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) @@ -386,10 +410,16 @@ def __init__( :param asyncio: If set, then the input and output of this Client work in an asynchronous manner. :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param port: The port of the Gateway, which the client should connect to. :param protocol: Communication protocol between server and client. :param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy :param tls: If set, connect to gateway using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param deployments_addresses: dictionary JSON with the input addresses of each Deployment @@ -420,6 +450,9 @@ def __init__( :param host: The host address of the runtime, by default it is 0.0.0.0. In the case of an external Executor (`--external` or `external=True`) this can be a list of hosts, separated by commas. Then, every resulting address will be considered as one replica of the Executor. :param host_in: The host address for binding to, by default it is 0.0.0.0 :param log_config: The YAML config of the logger used in this object. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param monitoring: If set, spawn an http server with a prometheus endpoint to expose metrics :param name: The name of this object. @@ -472,6 +505,9 @@ def __init__( :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param timeout_send: The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default :param title: The title of this HTTP server. It will be used in automatics docs such as Swagger UI. + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :param uses: The config of the gateway, it could be one of the followings: * the string literal of an Gateway class name * a Gateway YAML file (.yml, .yaml, .jaml) @@ -853,6 +889,9 @@ def add( host_in: Optional[str] = '0.0.0.0', install_requirements: Optional[bool] = False, log_config: Optional[str] = None, + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = None, + metrics_exporter_port: Optional[int] = None, monitoring: Optional[bool] = False, name: Optional[str] = None, native: Optional[bool] = False, @@ -872,6 +911,9 @@ def add( timeout_ready: Optional[int] = 600000, timeout_send: Optional[int] = None, tls: Optional[bool] = False, + traces_exporter_host: Optional[str] = None, + traces_exporter_port: Optional[int] = None, + tracing: Optional[bool] = False, upload_files: Optional[List[str]] = None, uses: Optional[Union[str, Type['BaseExecutor'], dict]] = 'BaseExecutor', uses_after: Optional[Union[str, Type['BaseExecutor'], dict]] = None, @@ -915,6 +957,9 @@ def add( :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local :param log_config: The YAML config of the logger used in this object. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param monitoring: If set, spawn an http server with a prometheus endpoint to expose metrics :param name: The name of this object. @@ -958,6 +1003,9 @@ def add( :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param timeout_send: The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default :param tls: If set, connect to deployment using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :param upload_files: The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. @@ -1064,6 +1112,9 @@ def add( :param host_in: The host address for binding to, by default it is 0.0.0.0 :param install_requirements: If set, install `requirements.txt` in the Hub Executor bundle to local :param log_config: The YAML config of the logger used in this object. + :param metrics: If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. Otherwise a no-op implementation will be provided. + :param metrics_exporter_host: If tracing is enabled, this hostname will be used to configure the metrics exporter agent. + :param metrics_exporter_port: If tracing is enabled, this port will be used to configure the metrics exporter agent. :param monitoring: If set, spawn an http server with a prometheus endpoint to expose metrics :param name: The name of this object. @@ -1107,6 +1158,9 @@ def add( :param timeout_ready: The timeout in milliseconds of a Pod waits for the runtime to be ready, -1 for waiting forever :param timeout_send: The timeout in milliseconds used when sending data requests to Executors, -1 means no timeout, disabled by default :param tls: If set, connect to deployment using tls encryption + :param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent. + :param traces_exporter_port: If tracing is enabled, this port will be used to configure the trace exporter agent. + :param tracing: If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. Otherwise a no-op implementation will be provided. :param upload_files: The files on the host to be uploaded to the remote workspace. This can be useful when your Deployment has more file dependencies beyond a single YAML file, e.g. diff --git a/jina/parsers/client.py b/jina/parsers/client.py index 76634ab10516e..f302eec682b90 100644 --- a/jina/parsers/client.py +++ b/jina/parsers/client.py @@ -30,3 +30,47 @@ def mixin_client_features_parser(parser): default=False, help='If set, then the input and output of this Client work in an asynchronous manner. ', ) + + parser.add_argument( + '--tracing', + action='store_true', + default=False, + help='If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. ' + 'Otherwise a no-op implementation will be provided.', + ) + + parser.add_argument( + '--traces-exporter-host', + type=str, + default=None, + help='If tracing is enabled, this hostname will be used to configure the trace exporter agent.', + ) + + parser.add_argument( + '--traces-exporter-port', + type=int, + default=None, + help='If tracing is enabled, this port will be used to configure the trace exporter agent.', + ) + + parser.add_argument( + '--metrics', + action='store_true', + default=False, + help='If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. ' + 'Otherwise a no-op implementation will be provided.', + ) + + parser.add_argument( + '--metrics-exporter-host', + type=str, + default=None, + help='If tracing is enabled, this hostname will be used to configure the metrics exporter agent.', + ) + + parser.add_argument( + '--metrics-exporter-port', + type=int, + default=None, + help='If tracing is enabled, this port will be used to configure the metrics exporter agent.', + ) diff --git a/jina/parsers/orchestrate/pod.py b/jina/parsers/orchestrate/pod.py index e51bb2935e1b4..3e98e20593afd 100644 --- a/jina/parsers/orchestrate/pod.py +++ b/jina/parsers/orchestrate/pod.py @@ -141,3 +141,47 @@ def mixin_pod_parser(parser, pod_type: str = 'worker'): help='If set, the current Pod/Deployment can not be further chained, ' 'and the next `.add()` will chain after the last Pod/Deployment not this current one.', ) + + gp.add_argument( + '--tracing', + action='store_true', + default=False, + help='If set, the sdk implementation of the OpenTelemetry tracer will be available and will be enabled for automatic tracing of requests and customer span creation. ' + 'Otherwise a no-op implementation will be provided.', + ) + + parser.add_argument( + '--traces-exporter-host', + type=str, + default=None, + help='If tracing is enabled, this hostname will be used to configure the trace exporter agent.', + ) + + parser.add_argument( + '--traces-exporter-port', + type=int, + default=None, + help='If tracing is enabled, this port will be used to configure the trace exporter agent.', + ) + + parser.add_argument( + '--metrics', + action='store_true', + default=False, + help='If set, the sdk implementation of the OpenTelemetry metrics will be available for default monitoring and custom measurements. ' + 'Otherwise a no-op implementation will be provided.', + ) + + parser.add_argument( + '--metrics-exporter-host', + type=str, + default=None, + help='If tracing is enabled, this hostname will be used to configure the metrics exporter agent.', + ) + + parser.add_argument( + '--metrics-exporter-port', + type=int, + default=None, + help='If tracing is enabled, this port will be used to configure the metrics exporter agent.', + ) diff --git a/jina/resources/extra-requirements.txt b/jina/resources/extra-requirements.txt index 7ca4cb34e4e80..f803a5b3fdbe8 100644 --- a/jina/resources/extra-requirements.txt +++ b/jina/resources/extra-requirements.txt @@ -35,8 +35,17 @@ packaging>=20.0: core docarray>=0.16.4: core jina-hubble-sdk>=0.19.0: core jcloud>=0.0.35: core +opentelemetry-api>=1.12.0: core +opentelemetry-instrumentation-grpc>=0.33b0: core uvloop: perf,standard,devel prometheus_client: perf,standard,devel +opentelemetry-sdk>=1.12.0: perf,standard,devel +opentelemetry-exporter-otlp>=1.12.0: perf,standard,devel +opentelemetry-exporter-prometheus>=1.12.0rc1: perf,standard,devel +opentelemetry-semantic-conventions>=0.33b0: perf,standard,devel +opentelemetry-instrumentation-aiohttp-client>=0.33b0: perf,standard,devel +opentelemetry-instrumentation-fastapi>=0.33b0: perf,standard,devel +opentelemetry-exporter-otlp-proto-grpc>=1.13.0: perf,standrad,devel fastapi>=0.76.0: standard,devel uvicorn[standard]: standard,devel docarray[common]>=0.16.3: standard,devel @@ -77,3 +86,4 @@ bs4: cicd jsonschema: cicd portforward>=0.2.4: cicd tensorflow>=2.0: cicd +opentelemetry-test-utils>=0.33b0: test diff --git a/jina/serve/executors/__init__.py b/jina/serve/executors/__init__.py index 008f0cb3bc1b0..f84f820e0312a 100644 --- a/jina/serve/executors/__init__.py +++ b/jina/serve/executors/__init__.py @@ -13,11 +13,12 @@ from jina.importer import ImportExtensions from jina.jaml import JAML, JAMLCompatible, env_var_regex, internal_var_regex from jina.logging.logger import JinaLogger -from jina.serve.executors.decorators import avoid_concurrent_lock_cls, requests -from jina.serve.executors.metas import get_default_metas, get_executor_taboo +from jina.serve.executors.decorators import avoid_concurrent_lock_cls +from jina.serve.executors.metas import get_executor_taboo from jina.serve.helper import store_init_kwargs, wrap_func if TYPE_CHECKING: + from opentelemetry.context.context import Context from prometheus_client import Summary __dry_run_endpoint__ = '_jina_dry_run_' @@ -133,6 +134,7 @@ def __init__( self._add_requests(requests) self._add_runtime_args(runtime_args) self._init_monitoring() + self._init_instrumentation(runtime_args) self._init_workspace = workspace self.logger = JinaLogger(self.__class__.__name__) if __dry_run_endpoint__ not in self.requests: @@ -178,6 +180,28 @@ def _init_monitoring(self): self._summary_method = None self._metrics_buffer = None + def _init_instrumentation(self, _runtime_args: Optional[Dict] = None): + if not _runtime_args: + _runtime_args = {} + + instrumenting_module_name = _runtime_args.get('name', self.__class__.__name__) + + args_tracer_provider = _runtime_args.get('tracer_provider', None) + if args_tracer_provider: + self.tracer_provider = args_tracer_provider + self.tracer = self.tracer_provider.get_tracer(instrumenting_module_name) + else: + self.tracer_provider = None + self.tracer = None + + args_meter_provider = _runtime_args.get('meter_provider', None) + if args_meter_provider: + self.meter_provider = args_meter_provider + self.meter = self.meter_provider.get_meter(instrumenting_module_name) + else: + self.meter_provider = None + self.meter = None + def _add_requests(self, _requests: Optional[Dict]): if not hasattr(self, 'requests'): self.requests = {} @@ -292,7 +316,16 @@ async def __acall__(self, req_endpoint: str, **kwargs): elif __default_endpoint__ in self.requests: return await self.__acall_endpoint__(__default_endpoint__, **kwargs) - async def __acall_endpoint__(self, req_endpoint, **kwargs): + async def __acall_endpoint__( + self, req_endpoint, tracing_context: Optional['Context'], **kwargs + ): + async def exec_func(summary, tracing_context): + with summary: + if iscoroutinefunction(func): + return await func(self, tracing_context=tracing_context, **kwargs) + else: + return func(self, tracing_context=tracing_context, **kwargs) + func = self.requests[req_endpoint] runtime_name = ( @@ -307,11 +340,18 @@ async def __acall_endpoint__(self, req_endpoint, **kwargs): else contextlib.nullcontext() ) - with _summary: - if iscoroutinefunction(func): - return await func(self, **kwargs) - else: - return func(self, **kwargs) + if self.tracer: + with self.tracer.start_span(req_endpoint, context=tracing_context) as _: + from opentelemetry.propagate import extract + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + tracing_carrier_context = {} + TraceContextTextMapPropagator().inject(tracing_carrier_context) + return await exec_func(_summary, extract(tracing_carrier_context)) + else: + return await exec_func(_summary, None) @property def workspace(self) -> Optional[str]: diff --git a/jina/serve/gateway.py b/jina/serve/gateway.py index 7d9d3c7ef2039..45102f3dd28a8 100644 --- a/jina/serve/gateway.py +++ b/jina/serve/gateway.py @@ -1,10 +1,7 @@ import abc import argparse -import functools -import inspect -from typing import TYPE_CHECKING, Callable, Optional +from typing import TYPE_CHECKING, Optional, Sequence -from jina.helper import convert_tuple_to_list from jina.jaml import JAMLCompatible from jina.logging.logger import JinaLogger from jina.serve.helper import store_init_kwargs, wrap_func @@ -13,6 +10,11 @@ __all__ = ['BaseGateway'] if TYPE_CHECKING: + from grpc.aio._interceptor import ClientInterceptor, ServerInterceptor + from opentelemetry import trace + from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + ) from prometheus_client import CollectorRegistry @@ -72,20 +74,35 @@ def __init__( # TODO: original implementation also passes args, maybe move this to a setter/initializer func self.logger = JinaLogger(self.name) - def set_streamer( + def inject_dependencies( self, args: 'argparse.Namespace' = None, timeout_send: Optional[float] = None, metrics_registry: Optional['CollectorRegistry'] = None, runtime_name: Optional[str] = None, + tracing: Optional[bool] = False, + tracer_provider: Optional['trace.TracerProvider'] = None, + grpc_tracing_server_interceptors: Optional[ + Sequence['ServerInterceptor'] + ] = None, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, + tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None, ): """ - Set streamer object by providing runtime parameters. + Set additional dependencies by providing runtime parameters. :param args: runtime args :param timeout_send: grpc connection timeout :param metrics_registry: metric registry when monitoring is enabled :param runtime_name: name of the runtime providing the streamer + :param tracing: Enables tracing if set to True. + :param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code. + :param grpc_tracing_server_interceptors: List of async io gprc server tracing interceptors for tracing requests. + :param aio_tracing_client_interceptors: List of async io gprc client tracing interceptors for tracing requests if asycnio is True. + :param tracing_client_interceptor: A gprc client tracing interceptor for tracing requests if asyncio is False. """ + self.tracing = tracing + self.tracer_provider = tracer_provider + self.grpc_tracing_server_interceptors = grpc_tracing_server_interceptors import json from jina.serve.streamer import GatewayStreamer @@ -107,6 +124,8 @@ def set_streamer( prefetch=args.prefetch, logger=self.logger, metrics_registry=metrics_registry, + aio_tracing_client_interceptors=aio_tracing_client_interceptors, + tracing_client_interceptor=tracing_client_interceptor, ) @abc.abstractmethod diff --git a/jina/serve/instrumentation/__init__.py b/jina/serve/instrumentation/__init__.py new file mode 100644 index 0000000000000..64786f3478745 --- /dev/null +++ b/jina/serve/instrumentation/__init__.py @@ -0,0 +1,125 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +if TYPE_CHECKING: + from grpc.aio._interceptor import ClientInterceptor, ServerInterceptor + from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + ) + + +class InstrumentationMixin: + '''Instrumentation mixin for OpenTelemetery Tracing and Metrics handling''' + + def _setup_instrumentation( + self, + name: str, + tracing: Optional[bool] = False, + traces_exporter_host: Optional[str] = '0.0.0.0', + traces_exporter_port: Optional[int] = 6831, + metrics: Optional[bool] = False, + metrics_exporter_host: Optional[str] = '0.0.0.0', + metrics_exporter_port: Optional[int] = 6831, + ) -> None: + + self.tracing = tracing + self.metrics = metrics + + if tracing: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.sdk.resources import SERVICE_NAME, Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + + resource = Resource(attributes={SERVICE_NAME: name}) + provider = TracerProvider(resource=resource) + processor = BatchSpanProcessor( + OTLPSpanExporter( + endpoint=f'{traces_exporter_host}:{traces_exporter_port}', + insecure=True, + ) + ) + provider.add_span_processor(processor) + self.tracer_provider = provider + self.tracer = provider.get_tracer(name) + else: + self.tracer_provider = None + self.tracer = None + + if metrics: + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import SERVICE_NAME, Resource + + resource = Resource(attributes={SERVICE_NAME: name}) + + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=f'{metrics_exporter_host}:{metrics_exporter_port}', + insecure=True, + ) + ) + meter_provider = MeterProvider( + metric_readers=[metric_reader], resource=resource + ) + self.meter_provider = meter_provider + self.meter = self.meter_provider.get_meter(name) + else: + self.meter_provider = None + self.meter = None + + def aio_tracing_server_interceptors( + self, + ) -> Optional[Sequence['ServerInterceptor']]: + '''Create a gRPC aio server interceptor. + :returns: A service-side aio interceptor object. + ''' + if self.tracing: + from jina.serve.instrumentation._aio_server import ( + OpenTelemetryAioServerInterceptor, + ) + + return [OpenTelemetryAioServerInterceptor(self.tracer)] + else: + return None + + def aio_tracing_client_interceptors( + self, + ) -> Optional[Sequence['ClientInterceptor']]: + '''Create a gRPC client aio channel interceptor. + :returns: An invocation-side list of aio interceptor objects. + ''' + + if self.tracing: + from jina.serve.instrumentation._aio_client import ( + StreamStreamAioClientInterceptor, + StreamUnaryAioClientInterceptor, + UnaryStreamAioClientInterceptor, + UnaryUnaryAioClientInterceptor, + ) + + return [ + UnaryUnaryAioClientInterceptor(self.tracer), + UnaryStreamAioClientInterceptor(self.tracer), + StreamUnaryAioClientInterceptor(self.tracer), + StreamStreamAioClientInterceptor(self.tracer), + ] + else: + return None + + def tracing_client_interceptor(self) -> Optional['OpenTelemetryClientInterceptor']: + ''' + :returns: a gRPC client interceptor with the global tracing provider. + ''' + if self.tracing: + from opentelemetry.instrumentation.grpc import ( + client_interceptor as grpc_client_interceptor, + ) + + return grpc_client_interceptor(self.tracer_provider) + else: + return None diff --git a/jina/serve/instrumentation/_aio_client.py b/jina/serve/instrumentation/_aio_client.py new file mode 100644 index 0000000000000..a8d00568e21cf --- /dev/null +++ b/jina/serve/instrumentation/_aio_client.py @@ -0,0 +1,319 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +from collections import OrderedDict + +import grpc +from grpc.aio import ClientCallDetails +from opentelemetry import context +from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + _carrier_setter, +) +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagate import inject +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.status import Status, StatusCode + + +def _unary_done_callback(span, code, details): + def callback(call): + try: + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + code.value[0], + ) + if code != grpc.StatusCode.OK: + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=details, + ) + ) + finally: + span.end() + + return callback + + +class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): + @staticmethod + def propagate_trace_in_details(client_call_details): + ''' + # noqa: DAR101 + # noqa: DAR201 + ''' + metadata = client_call_details.metadata + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + + return ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + client_call_details.credentials, + client_call_details.wait_for_ready, + ) + + @staticmethod + def add_error_details_to_span(span, exc): + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + span.record_exception(exc) + + async def _wrap_unary_response(self, continuation, span): + ''' + # noqa: DAR101 + # noqa: DAR201 + ''' + try: + call = await continuation() + + # code and details are both coroutines that need to be await-ed, + # the callbacks added with add_done_callback do not allow async + # code so we need to get the code and details here then pass them + # to the callback. + code = await call.code() + details = await call.details() + + call.add_done_callback(_unary_done_callback(span, code, details)) + + return call + except grpc.aio.AioRpcError as exc: + self.add_error_details_to_span(span, exc) + raise exc + + async def _wrap_stream_response(self, span, call): + try: + async for response in call: + yield response + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + span.end() + + +class UnaryUnaryAioClientInterceptor( + grpc.aio.UnaryUnaryClientInterceptor, + _BaseAioClientInterceptor, +): + '''Affords intercepting unary-unary invocations.''' + + async def intercept_unary_unary(self, continuation, client_call_details, request): + '''Intercepts a unary-unary invocation asynchronously. + + :param continuation: A coroutine that proceeds with the invocation by executing + the next interceptor in the chain or invoking the actual RPC on the + underlying Channel. It is the interceptor's responsibility to call it if + it decides to move the RPC forward. The interceptor can use + `call = await continuation(client_call_details, request)` to continue with + the RPC. `continuation` returns the call to the RPC. + :param client_call_details: A ClientCallDetails object describing the outgoing RPC. + :param request: The request value for the RPC. + + :returns: An object with the RPC response. + + :raises: AioRpcError: Indicating that the RPC terminated with non-OK status. + :raises: asyncio.CancelledError: Indicating that the RPC was canceled. + ''' + + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request + ) + return await self._wrap_unary_response(continuation_with_args, span) + + +class UnaryStreamAioClientInterceptor( + grpc.aio.UnaryStreamClientInterceptor, + _BaseAioClientInterceptor, +): + '''Affords intercepting unary-stream invocations.''' + + async def intercept_unary_stream(self, continuation, client_call_details, request): + '''Intercepts a unary-stream invocation asynchronously. + + The function could return the call object or an asynchronous + iterator, in case of being an asyncrhonous iterator this will + become the source of the reads done by the caller. + + :param continuation: A coroutine that proceeds with the invocation by + executing the next interceptor in the chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `call = await continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns the call to the + RPC. + :param client_call_details: A ClientCallDetails object describing the + outgoing RPC. + :param request: The request value for the RPC. + + :returns: The RPC Call or an asynchronous iterator. + + :raises: AioRpcError: Indicating that the RPC terminated with non-OK status. + :raises: asyncio.CancelledError: Indicating that the RPC was canceled. + ''' + + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + resp = await continuation(new_details, request) + + return self._wrap_stream_response(span, resp) + + +class StreamUnaryAioClientInterceptor( + grpc.aio.StreamUnaryClientInterceptor, + _BaseAioClientInterceptor, +): + '''Affords intercepting stream-unary invocations.''' + + async def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + '''Intercepts a stream-unary invocation asynchronously. + + Within the interceptor the usage of the call methods like `write` or + even awaiting the call should be done carefully, since the caller + could be expecting an untouched call, for example for start writing + messages to it. + + :param continuation: A coroutine that proceeds with the invocation by + executing the next interceptor in the chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `call = await continuation(client_call_details, request_iterator)` + to continue with the RPC. `continuation` returns the call to the + RPC. + :param client_call_details: A ClientCallDetails object describing the + outgoing RPC. + :param request_iterator: The request iterator that will produce requests + for the RPC. + + :returns: The RPC Call. + + :raises: AioRpcError: Indicating that the RPC terminated with non-OK status. + :raises: asyncio.CancelledError: Indicating that the RPC was canceled. + ''' + + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request_iterator) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + continuation_with_args = functools.partial( + continuation, new_details, request_iterator + ) + return await self._wrap_unary_response(continuation_with_args, span) + + +class StreamStreamAioClientInterceptor( + grpc.aio.StreamStreamClientInterceptor, + _BaseAioClientInterceptor, +): + '''Affords intercepting stream-stream invocations.''' + + async def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + '''Intercepts a stream-stream invocation asynchronously. + + Within the interceptor the usage of the call methods like `write` or + even awaiting the call should be done carefully, since the caller + could be expecting an untouched call, for example for start writing + messages to it. + + The function could return the call object or an asynchronous + iterator, in case of being an asyncrhonous iterator this will + become the source of the reads done by the caller. + + :param continuation: A coroutine that proceeds with the invocation by + executing the next interceptor in the chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `call = await continuation(client_call_details, request_iterator)` + to continue with the RPC. `continuation` returns the call to the + RPC. + :param client_call_details: A ClientCallDetails object describing the + outgoing RPC. + :param request_iterator: The request iterator that will produce requests + for the RPC. + + :returns: The RPC Call or an asynchronous iterator. + + :raises: AioRpcError: Indicating that the RPC terminated with non-OK status. + :raises: asyncio.CancelledError: Indicating that the RPC was canceled. + ''' + + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await continuation(client_call_details, request_iterator) + + method = client_call_details.method.decode("utf-8") + with self._start_span( + method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) + + resp = await continuation(new_details, request_iterator) + + return self._wrap_stream_response(span, resp) diff --git a/jina/serve/instrumentation/_aio_server.py b/jina/serve/instrumentation/_aio_server.py new file mode 100644 index 0000000000000..46aaf13ace29f --- /dev/null +++ b/jina/serve/instrumentation/_aio_server.py @@ -0,0 +1,108 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc.aio +from opentelemetry.instrumentation.grpc._server import ( + OpenTelemetryServerInterceptor, + _OpenTelemetryServicerContext, + _wrap_rpc_behavior, +) + + +class OpenTelemetryAioServerInterceptor( + grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor +): + ''' + An AsyncIO gRPC server interceptor, to add OpenTelemetry. + Usage:: + tracer = some OpenTelemetry tracer + interceptors = [ + AsyncOpenTelemetryServerInterceptor(tracer), + ] + server = aio.server( + futures.ThreadPoolExecutor(max_workers=concurrency), + interceptors = (interceptors,)) + ''' + + async def intercept_service(self, continuation, handler_call_details): + ''' + # noqa: DAR101 + # noqa: DAR102 + # noqa: DAR201 + ''' + + def telemetry_wrapper(behavior, request_streaming, response_streaming): + # handle streaming responses specially + if response_streaming: + return self._intercept_aio_server_stream( + behavior, + handler_call_details, + ) + + return self._intercept_aio_server_unary( + behavior, + handler_call_details, + ) + + next_handler = await continuation(handler_call_details) + + return _wrap_rpc_behavior(next_handler, telemetry_wrapper) + + def _intercept_aio_server_unary(self, behavior, handler_call_details): + async def _unary_interceptor(request_or_iterator, context): + with self._set_remote_context(context): + with self._start_span( + handler_call_details, + context, + set_status_on_exception=False, + ) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + + # And now we run the actual RPC. + try: + return await behavior(request_or_iterator, context) + + except Exception as error: + # Bare exceptions are likely to be gRPC aborts, which + # we handle in our context wrapper. + # Here, we're interested in uncaught exceptions. + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error + + return _unary_interceptor + + def _intercept_aio_server_stream(self, behavior, handler_call_details): + async def _stream_interceptor(request_or_iterator, context): + with self._set_remote_context(context): + with self._start_span( + handler_call_details, + context, + set_status_on_exception=False, + ) as span: + context = _OpenTelemetryServicerContext(context, span) + + try: + async for response in behavior(request_or_iterator, context): + yield response + + except Exception as error: + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error + + return _stream_interceptor diff --git a/jina/serve/networking.py b/jina/serve/networking.py index 65a55572df78b..429d69dbb6c81 100644 --- a/jina/serve/networking.py +++ b/jina/serve/networking.py @@ -3,7 +3,7 @@ import os from collections import defaultdict from dataclasses import dataclass -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from urllib.parse import urlparse import grpc @@ -27,6 +27,10 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + from grpc.aio._interceptor import ClientInterceptor + from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + ) from prometheus_client import CollectorRegistry, Summary @@ -58,6 +62,8 @@ def __init__( metrics: _NetworkingMetrics, logger, runtine_name: str, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, + tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None, ): self.runtime_name = runtine_name self._connections = [] @@ -67,6 +73,8 @@ def __init__( self._metrics = metrics self._logger = logger self._destroyed_event = asyncio.Event() + self.aio_tracing_client_interceptors = aio_tracing_client_interceptors + self.tracing_client_interceptors = tracing_client_interceptor async def reset_connection(self, address: str) -> Union[grpc.aio.Channel, None]: """ @@ -111,7 +119,9 @@ def add_connection(self, address: str): """ if address not in self._address_to_connection_idx: self._address_to_connection_idx[address] = len(self._connections) - stubs, channel = self._create_connection(address) + stubs, channel = self._create_connection( + address, + ) self._address_to_channel[address] = channel self._connections.append(stubs) @@ -159,6 +169,7 @@ def _create_connection(self, address): address, metrics=self._metrics, tls=use_tls, + aio_tracing_client_interceptors=self.aio_tracing_client_interceptors, ) return stubs, channel @@ -417,6 +428,12 @@ def __init__( runtime_name: str, logger: Optional[JinaLogger], metrics: _NetworkingMetrics, + aio_tracing_client_interceptors: Optional[ + Sequence['ClientInterceptor'] + ] = None, + tracing_client_interceptor: Optional[ + 'OpenTelemetryClientInterceptor' + ] = None, ): self._logger = logger # this maps deployments to shards or heads @@ -428,6 +445,8 @@ def __init__( if os.name != 'nt': os.unsetenv('http_proxy') os.unsetenv('https_proxy') + self.aio_tracing_client_interceptors = aio_tracing_client_interceptors + self.tracing_client_interceptor = tracing_client_interceptor def add_replica(self, deployment: str, shard_id: int, address: str): self._add_connection(deployment, shard_id, address, 'shards') @@ -528,7 +547,11 @@ def _add_connection( self._add_deployment(deployment) if entity_id not in self._deployments[deployment][type]: connection_list = ReplicaList( - self._metrics, self._logger, self.runtime_name + self._metrics, + self._logger, + self.runtime_name, + self.aio_tracing_client_interceptors, + self.tracing_client_interceptor, ) self._deployments[deployment][type][entity_id] = connection_list @@ -576,6 +599,8 @@ def __init__( logger: Optional[JinaLogger] = None, compression: Optional[str] = None, metrics_registry: Optional['CollectorRegistry'] = None, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, + tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None, ): self._logger = logger or JinaLogger(self.__class__.__name__) @@ -626,8 +651,14 @@ def __init__( send_requests_bytes_metrics, ) + self.aio_tracing_client_interceptors = aio_tracing_client_interceptors + self.tracing_client_interceptor = tracing_client_interceptor self._connections = self._ConnectionPoolMap( - runtime_name, self._logger, self._metrics + runtime_name, + self._logger, + self._metrics, + self.aio_tracing_client_interceptors, + self.tracing_client_interceptor, ) self._deployment_address_map = {} @@ -987,6 +1018,48 @@ async def task_wrapper(): return asyncio.create_task(task_wrapper()) + @staticmethod + def __aio_channel_with_tracing_interceptor( + address, + credentials=None, + options=None, + interceptors=None, + ) -> grpc.aio.Channel: + if credentials: + return grpc.aio.secure_channel( + address, + credentials, + options=options, + interceptors=interceptors, + ) + return grpc.aio.insecure_channel( + address, + options=options, + interceptors=interceptors, + ) + + @staticmethod + def __channel_with_tracing_interceptor( + address, + credentials=None, + options=None, + interceptor=None, + ) -> grpc.Channel: + if credentials: + channel = grpc.secure_channel(address, credentials, options=options) + else: + channel = grpc.insecure_channel(address, options=options) + + if interceptor: + from opentelemetry.instrumentation.grpc.grpcext import intercept_channel + + return intercept_channel( + channel, + interceptor, + ) + else: + return channel + @staticmethod def get_grpc_channel( address: str, @@ -994,6 +1067,8 @@ def get_grpc_channel( asyncio: bool = False, tls: bool = False, root_certificates: Optional[str] = None, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, + tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None, ) -> grpc.Channel: """ Creates a grpc channel to the given address @@ -1003,28 +1078,28 @@ def get_grpc_channel( :param asyncio: If True, use the asyncio implementation of the grpc channel :param tls: If True, use tls encryption for the grpc channel :param root_certificates: The path to the root certificates for tls, only used if tls is True - + :param aio_tracing_client_interceptors: List of async io gprc client tracing interceptors for tracing requests if asycnio is True + :param tracing_client_interceptor: A gprc client tracing interceptor for tracing requests if asyncio is False :return: A grpc channel or an asyncio channel """ - secure_channel = grpc.secure_channel - insecure_channel = grpc.insecure_channel - - if asyncio: - secure_channel = grpc.aio.secure_channel - insecure_channel = grpc.aio.insecure_channel - if options is None: options = GrpcConnectionPool.get_default_grpc_options() + credentials = None if tls: credentials = grpc.ssl_channel_credentials( root_certificates=root_certificates ) - return secure_channel(address, credentials, options) + if asyncio: + return GrpcConnectionPool.__aio_channel_with_tracing_interceptor( + address, credentials, options, aio_tracing_client_interceptors + ) - return insecure_channel(address, options) + return GrpcConnectionPool.__channel_with_tracing_interceptor( + address, credentials, options, tracing_client_interceptor + ) @staticmethod def send_request_sync( @@ -1202,6 +1277,7 @@ def create_async_channel_stub( metrics: _NetworkingMetrics, tls=False, root_certificates: Optional[str] = None, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, ) -> Tuple[ConnectionStubs, grpc.aio.Channel]: """ Creates an async GRPC Channel. This channel has to be closed eventually! @@ -1210,6 +1286,7 @@ def create_async_channel_stub( :param tls: if True, use tls for the grpc channel :param root_certificates: the path to the root certificates for tls, only u :param metrics: NetworkingMetrics object that contain optional metrics + :param aio_tracing_client_interceptors: List of async io gprc client tracing interceptors for tracing requests for asycnio channel :returns: DataRequest stubs and an async grpc channel """ channel = GrpcConnectionPool.get_grpc_channel( @@ -1217,6 +1294,7 @@ def create_async_channel_stub( asyncio=True, tls=tls, root_certificates=root_certificates, + aio_tracing_client_interceptors=aio_tracing_client_interceptors, ) return ( diff --git a/jina/serve/runtimes/asyncio.py b/jina/serve/runtimes/asyncio.py index 1c0eddb88e290..870c78fae003c 100644 --- a/jina/serve/runtimes/asyncio.py +++ b/jina/serve/runtimes/asyncio.py @@ -11,6 +11,7 @@ from jina import __windows__ from jina.helper import send_telemetry_event from jina.importer import ImportExtensions +from jina.serve.instrumentation import InstrumentationMixin from jina.serve.networking import GrpcConnectionPool from jina.serve.runtimes.base import BaseRuntime from jina.serve.runtimes.monitoring import MonitoringMixin @@ -21,7 +22,7 @@ import threading -class AsyncNewLoopRuntime(BaseRuntime, MonitoringMixin, ABC): +class AsyncNewLoopRuntime(BaseRuntime, MonitoringMixin, InstrumentationMixin, ABC): """ The async runtime to start a new event loop. """ @@ -65,6 +66,15 @@ def __init__( ) self._setup_monitoring() + self._setup_instrumentation( + name=self.args.name, + tracing=self.args.tracing, + traces_exporter_host=self.args.traces_exporter_host, + traces_exporter_port=self.args.traces_exporter_port, + metrics=self.args.metrics, + metrics_exporter_host=self.args.metrics_exporter_host, + metrics_exporter_port=self.args.metrics_exporter_port, + ) send_telemetry_event(event='start', obj=self, entity_id=self._entity_id) self._start_time = time.time() self._loop.run_until_complete(self.async_setup()) @@ -77,8 +87,24 @@ def run_forever(self): """ self._loop.run_until_complete(self._loop_body()) + def _teardown_instrumentation(self): + try: + if self.tracing and self.tracer_provider: + if hasattr(self.tracer_provider, 'force_flush'): + self.tracer_provider.force_flush() + if hasattr(self.tracer_provider, 'shutdown'): + self.tracer_provider.shutdown() + if self.metrics and self.meter_provider: + if hasattr(self.meter_provider, 'force_flush'): + self.meter_provider.force_flush() + if hasattr(self.meter_provider, 'shutdown'): + self.meter_provider.shutdown() + except Exception as ex: + self.logger.warning(f'Exception during instrumentation teardown, {str(ex)}') + def teardown(self): """Call async_teardown() and stop and close the event loop.""" + self._teardown_instrumentation() self._loop.run_until_complete(self.async_teardown()) self._loop.stop() self._loop.close() diff --git a/jina/serve/runtimes/gateway/__init__.py b/jina/serve/runtimes/gateway/__init__.py index d79b837305ada..0ccf3e01cfe2f 100644 --- a/jina/serve/runtimes/gateway/__init__.py +++ b/jina/serve/runtimes/gateway/__init__.py @@ -13,15 +13,18 @@ from jina.parsers.helper import _set_gateway_uses from jina.serve.gateway import BaseGateway from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime -from jina.serve.runtimes.gateway.grpc import GRPCGateway -from jina.serve.runtimes.gateway.http import HTTPGateway -from jina.serve.runtimes.gateway.websocket import WebSocketGateway if TYPE_CHECKING: import multiprocessing import threading +# Keep these imports even if not used, since YAML parser needs to find them in imported modules +from jina.serve.runtimes.gateway.grpc import GRPCGateway +from jina.serve.runtimes.gateway.http import HTTPGateway +from jina.serve.runtimes.gateway.websocket import WebSocketGateway + + class GatewayRuntime(AsyncNewLoopRuntime): """ The Gateway Runtime that starts a gateway pod. @@ -83,11 +86,16 @@ async def async_setup(self): extra_search_paths=self.args.extra_search_paths, ) - self.gateway.set_streamer( + self.gateway.inject_dependencies( args=self.args, timeout_send=self.timeout_send, metrics_registry=self.metrics_registry, runtime_name=self.args.name, + tracing=self.tracing, + tracer_provider=self.tracer_provider, + grpc_tracing_server_interceptors=self.aio_tracing_server_interceptors(), + aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(), + tracing_client_interceptor=self.tracing_client_interceptor(), ) await self.gateway.setup_server() diff --git a/jina/serve/runtimes/gateway/grpc/__init__.py b/jina/serve/runtimes/gateway/grpc/__init__.py index 5301ab6305b22..214829005da5e 100644 --- a/jina/serve/runtimes/gateway/grpc/__init__.py +++ b/jina/serve/runtimes/gateway/grpc/__init__.py @@ -1 +1,3 @@ from jina.serve.runtimes.gateway.grpc.gateway import GRPCGateway + +__all__ = ['GRPCGateway'] diff --git a/jina/serve/runtimes/gateway/grpc/gateway.py b/jina/serve/runtimes/gateway/grpc/gateway.py index 81a5c62922723..6420736cb4016 100644 --- a/jina/serve/runtimes/gateway/grpc/gateway.py +++ b/jina/serve/runtimes/gateway/grpc/gateway.py @@ -8,10 +8,9 @@ from jina.helper import get_full_version from jina.proto import jina_pb2, jina_pb2_grpc from jina.serve.gateway import BaseGateway +from jina.serve.runtimes.helper import _get_grpc_server_options from jina.types.request.status import StatusMessage -from ...helper import _get_grpc_server_options - class GRPCGateway(BaseGateway): """GRPC Gateway implementation""" @@ -36,15 +35,17 @@ def __init__( self.grpc_server_options = grpc_server_options self.ssl_keyfile = ssl_keyfile self.ssl_certfile = ssl_certfile - self.server = grpc.aio.server( - options=_get_grpc_server_options(self.grpc_server_options) - ) self.health_servicer = health.HealthServicer(experimental_non_blocking=True) async def setup_server(self): """ setup GRPC server """ + self.server = grpc.aio.server( + options=_get_grpc_server_options(self.grpc_server_options), + interceptors=self.grpc_tracing_server_interceptors, + ) + jina_pb2_grpc.add_JinaRPCServicer_to_server( self.streamer._streamer, self.server ) diff --git a/jina/serve/runtimes/gateway/http/__init__.py b/jina/serve/runtimes/gateway/http/__init__.py index a7924cb8adc42..2155fc5eab1d4 100644 --- a/jina/serve/runtimes/gateway/http/__init__.py +++ b/jina/serve/runtimes/gateway/http/__init__.py @@ -1 +1,3 @@ from .gateway import HTTPGateway + +__all__ = ['HTTPGateway'] diff --git a/jina/serve/runtimes/gateway/http/app.py b/jina/serve/runtimes/gateway/http/app.py index 2ff5f9175d714..679a91151b7c2 100644 --- a/jina/serve/runtimes/gateway/http/app.py +++ b/jina/serve/runtimes/gateway/http/app.py @@ -11,6 +11,8 @@ from jina.logging.logger import JinaLogger if TYPE_CHECKING: + from opentelemetry import trace + from jina.serve.streamer import GatewayStreamer @@ -24,6 +26,8 @@ def get_fastapi_app( expose_graphql_endpoint: bool, cors: bool, logger: 'JinaLogger', + tracing: Optional[bool] = None, + tracer_provider: Optional['trace.TracerProvider'] = None, ): """ Get the app from FastAPI as the REST interface. @@ -39,6 +43,8 @@ def get_fastapi_app( :param expose_graphql_endpoint: If set, /graphql endpoint is added to HTTP interface. :param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. :param logger: Jina logger. + :param tracing: Enables tracing if set to True. + :param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code. :return: fastapi app """ with ImportExtensions(required=True): @@ -59,6 +65,11 @@ def get_fastapi_app( version=__version__, ) + if tracing: + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider) + if cors: app.add_middleware( CORSMiddleware, diff --git a/jina/serve/runtimes/gateway/http/gateway.py b/jina/serve/runtimes/gateway/http/gateway.py index 06e724fe080bf..eafc3f550944a 100644 --- a/jina/serve/runtimes/gateway/http/gateway.py +++ b/jina/serve/runtimes/gateway/http/gateway.py @@ -80,6 +80,8 @@ async def setup_server(self): expose_graphql_endpoint=self.expose_graphql_endpoint, cors=self.cors, logger=self.logger, + tracing=self.tracing, + tracer_provider=self.tracer_provider, ) ) diff --git a/jina/serve/runtimes/gateway/websocket/__init__.py b/jina/serve/runtimes/gateway/websocket/__init__.py index 42fef32367b83..3fb0a5abad1f6 100644 --- a/jina/serve/runtimes/gateway/websocket/__init__.py +++ b/jina/serve/runtimes/gateway/websocket/__init__.py @@ -1 +1,4 @@ +from jina.serve.runtimes.gateway.websocket.app import get_fastapi_app from jina.serve.runtimes.gateway.websocket.gateway import WebSocketGateway + +__all__ = ['WebSocketGateway'] diff --git a/jina/serve/runtimes/gateway/websocket/app.py b/jina/serve/runtimes/gateway/websocket/app.py index 2793e18898b51..1e69071e9dcc6 100644 --- a/jina/serve/runtimes/gateway/websocket/app.py +++ b/jina/serve/runtimes/gateway/websocket/app.py @@ -11,6 +11,8 @@ from jina.types.request.status import StatusMessage if TYPE_CHECKING: + from opentelemetry import trace + from jina.serve.streamer import GatewayStreamer @@ -24,12 +26,16 @@ def _fits_ws_close_msg(msg: str): def get_fastapi_app( streamer: 'GatewayStreamer', logger: 'JinaLogger', + tracing: Optional[bool] = None, + tracer_provider: Optional['trace.TracerProvider'] = None, ): """ Get the app from FastAPI as the Websocket interface. :param streamer: gateway streamer object. :param logger: Jina logger. + :param tracing: Enables tracing is set to True. + :param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code. :return: fastapi app """ @@ -109,6 +115,11 @@ async def send( app = FastAPI() + if tracing: + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider) + @app.get( path='/', summary='Get the health of Jina service', diff --git a/jina/serve/runtimes/gateway/websocket/gateway.py b/jina/serve/runtimes/gateway/websocket/gateway.py index c49d24b88ef4e..6bf8baeda9f09 100644 --- a/jina/serve/runtimes/gateway/websocket/gateway.py +++ b/jina/serve/runtimes/gateway/websocket/gateway.py @@ -49,6 +49,8 @@ async def setup_server(self): get_fastapi_app( streamer=self.streamer, logger=self.logger, + tracing=self.tracing, + tracer_provider=self.tracer_provider, ) ) diff --git a/jina/serve/runtimes/head/__init__.py b/jina/serve/runtimes/head/__init__.py index 7471cc62bb987..5c5b286632f35 100644 --- a/jina/serve/runtimes/head/__init__.py +++ b/jina/serve/runtimes/head/__init__.py @@ -16,6 +16,7 @@ from jina.helper import get_full_version from jina.importer import ImportExtensions from jina.proto import jina_pb2, jina_pb2_grpc +from jina.serve.instrumentation import InstrumentationMixin from jina.serve.networking import GrpcConnectionPool from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime from jina.serve.runtimes.helper import _get_grpc_server_options @@ -144,7 +145,8 @@ def _default_polling_dict(self, default_polling): async def async_setup(self): """Wait for the GRPC server to start""" self._grpc_server = grpc.aio.server( - options=_get_grpc_server_options(self.args.grpc_server_options) + options=_get_grpc_server_options(self.args.grpc_server_options), + interceptors=self.aio_tracing_server_interceptors(), ) jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server( diff --git a/jina/serve/runtimes/request_handlers/data_request_handler.py b/jina/serve/runtimes/request_handlers/data_request_handler.py index bc77718ad4320..35497e5eb39d4 100644 --- a/jina/serve/runtimes/request_handlers/data_request_handler.py +++ b/jina/serve/runtimes/request_handlers/data_request_handler.py @@ -1,5 +1,4 @@ -import copy -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional from docarray import DocumentArray @@ -12,6 +11,8 @@ if TYPE_CHECKING: import argparse + from opentelemetry import metrics, trace + from opentelemetry.context.context import Context from prometheus_client import CollectorRegistry from jina.logging.logger import JinaLogger @@ -27,6 +28,8 @@ def __init__( args: 'argparse.Namespace', logger: 'JinaLogger', metrics_registry: Optional['CollectorRegistry'] = None, + tracer_provider: Optional['trace.TracerProvider'] = None, + meter_provider: Optional['metrics.MeterProvider'] = None, **kwargs, ): """Initialize private parameters and execute private loading functions. @@ -34,6 +37,8 @@ def __init__( :param args: args from CLI :param logger: the logger provided by the user :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics from the executor of from the data request handler + :param tracer_provider: Optional tracer_provider that will be provided to the executor for tracing + :param meter_provider: Optional meter_provider that will be provided to the executor for metrics :param kwargs: extra keyword arguments """ super().__init__() @@ -41,7 +46,11 @@ def __init__( self.args.parallel = self.args.shards self.logger = logger self._is_closed = False - self._load_executor(metrics_registry) + self._load_executor( + metrics_registry=metrics_registry, + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) self._init_monitoring(metrics_registry) def _init_monitoring(self, metrics_registry: Optional['CollectorRegistry'] = None): @@ -85,10 +94,17 @@ def _init_monitoring(self, metrics_registry: Optional['CollectorRegistry'] = Non self._request_size_metrics = None self._sent_response_size_metrics = None - def _load_executor(self, metrics_registry: Optional['CollectorRegistry'] = None): + def _load_executor( + self, + metrics_registry: Optional['CollectorRegistry'] = None, + tracer_provider: Optional['trace.TracerProvider'] = None, + meter_provider: Optional['metrics.MeterProvider'] = None, + ): """ Load the executor to this runtime, specified by ``uses`` CLI argument. :param metrics_registry: Optional prometheus metrics registry that will be passed to the executor so that it can expose metrics + :param tracer_provider: Optional tracer_provider that will be provided to the executor for tracing + :param meter_provider: Optional meter_provider that will be provided to the executor for metrics """ try: self._executor: BaseExecutor = BaseExecutor.load_config( @@ -103,6 +119,8 @@ def _load_executor(self, metrics_registry: Optional['CollectorRegistry'] = None) 'replicas': self.args.replicas, 'name': self.args.name, 'metrics_registry': metrics_registry, + 'tracer_provider': tracer_provider, + 'meter_provider': meter_provider, }, py_modules=self.args.py_modules, extra_search_paths=self.args.extra_search_paths, @@ -128,10 +146,13 @@ def _parse_params(parameters: Dict, executor_name: str): return parsed_params - async def handle(self, requests: List['DataRequest']) -> DataRequest: + async def handle( + self, requests: List['DataRequest'], tracing_context: 'Context' = None + ) -> DataRequest: """Initialize private parameters and execute private loading functions. :param requests: The messages to handle containing a DataRequest + :param tracing_context: OpenTelemetry tracing context from the originating request. :returns: the processed message """ # skip executor if endpoints mismatch @@ -169,6 +190,7 @@ async def handle(self, requests: List['DataRequest']) -> DataRequest: requests, field='docs', ), + tracing_context=tracing_context, ) # assigning result back to request if return_data is not None: diff --git a/jina/serve/runtimes/worker/__init__.py b/jina/serve/runtimes/worker/__init__.py index 644eee19245ea..0864c212fa7f0 100644 --- a/jina/serve/runtimes/worker/__init__.py +++ b/jina/serve/runtimes/worker/__init__.py @@ -1,7 +1,7 @@ import argparse import contextlib from abc import ABC -from typing import List +from typing import TYPE_CHECKING, List import grpc from grpc_health.v1 import health, health_pb2, health_pb2_grpc @@ -16,6 +16,9 @@ from jina.serve.runtimes.request_handlers.data_request_handler import DataRequestHandler from jina.types.request.data import DataRequest +if TYPE_CHECKING: + from opentelemetry.propagate import Context + class WorkerRuntime(AsyncNewLoopRuntime, ABC): """Runtime procedure leveraging :class:`Grpclet` for sending DataRequests""" @@ -79,7 +82,11 @@ async def async_setup(self): # otherwise readiness check is not valid # The DataRequestHandler needs to be started BEFORE the grpc server self._data_request_handler = DataRequestHandler( - self.args, self.logger, self.metrics_registry + self.args, + self.logger, + self.metrics_registry, + self.tracer_provider, + self.meter_provider, ) await self._async_setup_grpc_server() @@ -89,7 +96,8 @@ async def _async_setup_grpc_server(self): """ self._grpc_server = grpc.aio.server( - options=_get_grpc_server_options(self.args.grpc_server_options) + options=_get_grpc_server_options(self.args.grpc_server_options), + interceptors=self.aio_tracing_server_interceptors(), ) jina_pb2_grpc.add_JinaSingleDataRequestRPCServicer_to_server( @@ -165,6 +173,13 @@ async def endpoint_discovery(self, empty, context) -> jina_pb2.EndpointsProto: ) return endpointsProto + @staticmethod + def _extract_tracing_context(metadata: grpc.aio.Metadata) -> 'Context': + from opentelemetry.propagate import extract + + context = extract(dict(metadata)) + return context + async def process_data(self, requests: List[DataRequest], context) -> DataRequest: """ Process the received requests and return the result as a new request @@ -179,7 +194,12 @@ async def process_data(self, requests: List[DataRequest], context) -> DataReques if self.logger.debug_enabled: self._log_data_request(requests[0]) - result = await self._data_request_handler.handle(requests=requests) + tracing_context = WorkerRuntime._extract_tracing_context( + context.invocation_metadata() + ) + result = await self._data_request_handler.handle( + requests=requests, tracing_context=tracing_context + ) if self._successful_requests_metrics: self._successful_requests_metrics.inc() return result diff --git a/jina/serve/streamer.py b/jina/serve/streamer.py index b1cc562bc76a6..8e4cbd7133546 100644 --- a/jina/serve/streamer.py +++ b/jina/serve/streamer.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union from docarray import DocumentArray @@ -11,6 +11,10 @@ __all__ = ['GatewayStreamer'] if TYPE_CHECKING: + from grpc.aio._interceptor import ClientInterceptor + from opentelemetry.instrumentation.grpc._client import ( + OpenTelemetryClientInterceptor, + ) from prometheus_client import CollectorRegistry @@ -32,6 +36,8 @@ def __init__( prefetch: int = 0, logger: Optional['JinaLogger'] = None, metrics_registry: Optional['CollectorRegistry'] = None, + aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None, + tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None, ): """ :param graph_representation: A dictionary describing the topology of the Deployments. 2 special nodes are expected, the name `start-gateway` and `end-gateway` to @@ -47,6 +53,8 @@ def __init__( :param prefetch: How many Requests are processed from the Client at the same time. :param logger: Optional logger that can be used for logging :param metrics_registry: optional metrics registry for prometheus used if we need to expose metrics + :param aio_tracing_client_interceptors: Optional list of aio grpc tracing server interceptors. + :param tracing_client_interceptor: Optional gprc tracing server interceptor. """ topology_graph = self._create_topology_graph( graph_representation, @@ -56,9 +64,16 @@ def __init__( retries, ) self.runtime_name = runtime_name + self.aio_tracing_client_interceptors = aio_tracing_client_interceptors + self.tracing_client_interceptor = tracing_client_interceptor self._connection_pool = self._create_connection_pool( - executor_addresses, compression, metrics_registry, logger + executor_addresses, + compression, + metrics_registry, + logger, + aio_tracing_client_interceptors, + tracing_client_interceptor, ) request_handler = RequestHandler(metrics_registry, runtime_name) @@ -90,7 +105,13 @@ def _create_topology_graph( ) def _create_connection_pool( - self, deployments_addresses, compression, metrics_registry, logger + self, + deployments_addresses, + compression, + metrics_registry, + logger, + aio_tracing_client_interceptors, + tracing_client_interceptor, ): # add the connections needed connection_pool = GrpcConnectionPool( @@ -98,6 +119,8 @@ def _create_connection_pool( logger=logger, compression=compression, metrics_registry=metrics_registry, + aio_tracing_client_interceptors=aio_tracing_client_interceptors, + tracing_client_interceptor=tracing_client_interceptor, ) for deployment_name, addresses in deployments_addresses.items(): for address in addresses: diff --git a/jina_cli/autocomplete.py b/jina_cli/autocomplete.py index 445cf775363b8..47f52d815fd55 100644 --- a/jina_cli/autocomplete.py +++ b/jina_cli/autocomplete.py @@ -62,6 +62,12 @@ '--port-monitoring', '--retries', '--floating', + '--tracing', + '--traces-exporter-host', + '--traces-exporter-port', + '--metrics', + '--metrics-exporter-host', + '--metrics-exporter-port', '--install-requirements', '--force-update', '--force', @@ -159,6 +165,12 @@ '--port-monitoring', '--retries', '--floating', + '--tracing', + '--traces-exporter-host', + '--traces-exporter-port', + '--metrics', + '--metrics-exporter-host', + '--metrics-exporter-port', ], 'auth login': ['--help', '--force'], 'auth logout': ['--help'], @@ -269,6 +281,12 @@ '--port-monitoring', '--retries', '--floating', + '--tracing', + '--traces-exporter-host', + '--traces-exporter-port', + '--metrics', + '--metrics-exporter-host', + '--metrics-exporter-port', '--install-requirements', '--force-update', '--force', @@ -323,6 +341,12 @@ '--port-monitoring', '--retries', '--floating', + '--tracing', + '--traces-exporter-host', + '--traces-exporter-port', + '--metrics', + '--metrics-exporter-host', + '--metrics-exporter-port', '--install-requirements', '--force-update', '--force', @@ -346,6 +370,12 @@ '--port', '--tls', '--asyncio', + '--tracing', + '--traces-exporter-host', + '--traces-exporter-port', + '--metrics', + '--metrics-exporter-host', + '--metrics-exporter-port', '--protocol', ], }, diff --git a/tests/conftest.py b/tests/conftest.py index 032053ade9c4e..021139385f22b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -88,3 +88,4 @@ def event_loop(request): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() + diff --git a/tests/integration/instrumentation/__init__.py b/tests/integration/instrumentation/__init__.py new file mode 100644 index 0000000000000..58a5a9b709f68 --- /dev/null +++ b/tests/integration/instrumentation/__init__.py @@ -0,0 +1,107 @@ +from typing import Dict, Optional + +from docarray import DocumentArray +from opentelemetry.context.context import Context + +from jina import Executor, requests + + +def get_traces(service): + import requests + + response = requests.get(f'http://localhost:16686/api/traces?service={service}') + response.raise_for_status() + return response.json().get('data', []) or [] + + +def _get_trace_id(any_object): + return any_object.get('traceID', '') + + +def get_trace_ids(traces): + trace_ids = set() + for trace in traces: + trace_ids.add(_get_trace_id(trace)) + for span in trace['spans']: + trace_ids.add(_get_trace_id(span)) + + return trace_ids + + +def partition_spans_by_kind(traces): + '''Returns three lists each containing spans of kind SpanKind.SERVER, SpanKind.CLIENT and SpandKind.INTERNAL''' + server_spans = [] + client_spans = [] + internal_spans = [] + + for trace in traces: + for span in trace['spans']: + for tag in span['tags']: + if 'span.kind' == tag.get('key', ''): + span_kind = tag.get('value', '') + if 'server' == span_kind: + server_spans.append(span) + elif 'client' == span_kind: + client_spans.append(span) + elif 'internal' == span_kind: + internal_spans.append(span) + + return (server_spans, client_spans, internal_spans) + + +class ExecutorTestWithTracing(Executor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @requests(on='/index') + def empty( + self, docs: 'DocumentArray', tracing_context: Optional[Context], **kwargs + ): + if self.tracer: + with self.tracer.start_span('dummy', context=tracing_context) as span: + span.set_attribute('len_docs', len(docs)) + return docs + else: + return docs + + +def get_services(): + import requests + + response = requests.get('http://localhost:16686/api/services') + response.raise_for_status() + response_json = response.json() + services = response_json.get('data', []) or [] + return [service for service in services if service != 'jaeger-query'] + + +class ExecutorFailureWithTracing(Executor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.failure_counter = 0 + + @requests(on='/index') + def empty( + self, docs: 'DocumentArray', tracing_context: Optional[Context], **kwargs + ): + if self.tracer: + with self.tracer.start_span('dummy', context=tracing_context) as span: + span.set_attribute('len_docs', len(docs)) + if not self.failure_counter: + self.failure_counter += 1 + raise NotImplementedError + else: + return docs + else: + return docs + + +def spans_with_error(spans): + error_spans = [] + for span in spans: + for tag in span['tags']: + if 'otel.status_code' == tag.get('key', '') and 'ERROR' == tag.get( + 'value', '' + ): + error_spans.append(span) + return error_spans diff --git a/tests/integration/instrumentation/conftest.py b/tests/integration/instrumentation/conftest.py new file mode 100644 index 0000000000000..c29613a9b203e --- /dev/null +++ b/tests/integration/instrumentation/conftest.py @@ -0,0 +1,17 @@ +import os +import time + +import pytest + + +@pytest.fixture() +def otlp_collector(): + file_dir = os.path.dirname(__file__) + os.system( + f"docker-compose -f {os.path.join(file_dir, 'docker-compose.yml')} up -d --remove-orphans" + ) + time.sleep(1) + yield + os.system( + f"docker-compose -f {os.path.join(file_dir, 'docker-compose.yml')} down --remove-orphans" + ) diff --git a/tests/integration/instrumentation/docker-compose.yml b/tests/integration/instrumentation/docker-compose.yml new file mode 100644 index 0000000000000..b8936584042ec --- /dev/null +++ b/tests/integration/instrumentation/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3" +services: + # Jaeger + jaeger: + image: jaegertracing/all-in-one:latest + ports: + - "16686:16686" + - "14250" + + otel-collector: + image: otel/opentelemetry-collector:0.61.0 + command: [ "--config=/etc/otel-collector-config.yml" ] + volumes: + - ./otel-collector-config.yml:/etc/otel-collector-config.yml + ports: + - "1888:1888" # pprof extension + - "8888:8888" # Prometheus metrics exposed by the collector + - "8889:8889" # Prometheus exporter metrics + - "13133:13133" # health_check extension + - "55679:55679" # zpages extension + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP http receiver + depends_on: + - jaeger diff --git a/tests/integration/instrumentation/otel-collector-config.yml b/tests/integration/instrumentation/otel-collector-config.yml new file mode 100644 index 0000000000000..b7d34404d555c --- /dev/null +++ b/tests/integration/instrumentation/otel-collector-config.yml @@ -0,0 +1,26 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + jaeger: + endpoint: jaeger:14250 + tls: + insecure: true + +processors: + batch: + +extensions: + health_check: + pprof: + zpages: + +service: + extensions: [pprof, zpages, health_check] + pipelines: + traces: + receivers: [otlp] + exporters: [jaeger] + processors: [batch] \ No newline at end of file diff --git a/tests/integration/instrumentation/test_flow_instrumentation.py b/tests/integration/instrumentation/test_flow_instrumentation.py new file mode 100644 index 0000000000000..392791800c2d5 --- /dev/null +++ b/tests/integration/instrumentation/test_flow_instrumentation.py @@ -0,0 +1,92 @@ +import time + +import pytest + +from jina import Flow +from tests.integration.instrumentation import ( + ExecutorFailureWithTracing, + ExecutorTestWithTracing, + get_services, + get_trace_ids, + get_traces, + partition_spans_by_kind, + spans_with_error, +) + + +@pytest.mark.parametrize( + 'protocol, client_type, num_internal_spans', + [ + ('grpc', 'GRPCClient', 2), + ('http', 'HTTPClient', 5), + ('websocket', 'WebSocketClient', 7), + ], +) +def test_gateway_instrumentation( + otlp_collector, protocol, client_type, num_internal_spans +): + f = Flow( + protocol=protocol, + tracing=True, + traces_exporter_host='localhost', + traces_exporter_port=4317, + ).add( + uses=ExecutorTestWithTracing, + tracing=True, + traces_exporter_host='localhost', + traces_exporter_port=4317, + ) + + with f: + from jina import DocumentArray + + f.post(f'/index', DocumentArray.empty(2), continue_on_error=True) + # give some time for the tracing and metrics exporters to finish exporting. + # the client is slow to export the data + time.sleep(8) + + services = get_services() + expected_services = ['executor0/rep-0', 'gateway/rep-0', client_type] + assert len(services) == 3 + assert set(services).issubset(expected_services) + + client_traces = get_traces(client_type) + (server_spans, client_spans, internal_spans) = partition_spans_by_kind( + client_traces + ) + assert len(server_spans) == 5 + assert len(client_spans) == 5 + assert len(internal_spans) == num_internal_spans + + trace_ids = get_trace_ids(client_traces) + assert len(trace_ids) == 1 + + +def test_executor_instrumentation(otlp_collector): + f = Flow( + tracing=True, + traces_exporter_host='localhost', + traces_exporter_port=4317, + ).add(uses=ExecutorFailureWithTracing) + + with f: + from jina import DocumentArray + + f.post(f'/index', DocumentArray.empty(2), continue_on_error=True) + # give some time for the tracing and metrics exporters to finish exporting. + # the client is slow to export the data + time.sleep(8) + + client_type = 'GRPCClient' + client_traces = get_traces(client_type) + (server_spans, client_spans, internal_spans) = partition_spans_by_kind( + client_traces + ) + assert len(spans_with_error(server_spans)) == 0 + assert len(spans_with_error(client_spans)) == 0 + assert len(internal_spans) == 2 + # Errors reported by DataRequestHandler and request method level spans + assert len(spans_with_error(internal_spans)) == 2 + + trace_ids = get_trace_ids(client_traces) + assert len(trace_ids) == 1 diff --git a/tests/unit/serve/executors/test_decorators.py b/tests/unit/serve/executors/test_decorators.py index 237260a418c2f..ebc6660564366 100644 --- a/tests/unit/serve/executors/test_decorators.py +++ b/tests/unit/serve/executors/test_decorators.py @@ -3,9 +3,9 @@ import pytest from jina.helper import iscoroutinefunction -from jina.serve.executors import get_default_metas, get_executor_taboo +from jina.serve.executors import get_executor_taboo from jina.serve.executors.decorators import requests -from jina.serve.helper import store_init_kwargs, wrap_func +from jina.serve.helper import store_init_kwargs def test_store_init_kwargs():