Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation): add OpenTelemetry tracing and metrics with basic configurations #5175

Merged
merged 90 commits into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
a5a7f42
feat(instrumentation): create basic tracer and meter with console exp…
Sep 15, 2022
9e1b2d0
style: fix overload and cli autocomplete
jina-bot Sep 15, 2022
514792a
feat(instrumentation): move the instrumentation package to the serve …
Sep 16, 2022
c3b0c37
feat(instrumentation): provide options to enable tracing and metrics …
Sep 16, 2022
2269b57
feat(instrumentation): add the correct grpc opentelmetery insturmenta…
Sep 19, 2022
14cb744
feat(serve): instrument grpc server and channel with interceptors
Sep 19, 2022
f53be22
style: fix overload and cli autocomplete
jina-bot Sep 19, 2022
a4a4621
feat(instrumentation): provide opentelemety context from the grpc cli…
Sep 20, 2022
78efb44
feat(instrumentation): check for opentelemetry environment variables …
Sep 20, 2022
7116e9f
feat(instrumentation): create InstrumentationMixin for server and cli…
Sep 20, 2022
92d3679
chore(instrumentation): use absolute module import
Sep 21, 2022
eb0ccd3
feat(instrumentation): trace http and websocket server and clients
Sep 21, 2022
38cae61
chore(instrumentation): update/add new opentelemetry arguments
Sep 21, 2022
45d1794
feat(instrumentation): globally disable tracing health check requests
Sep 21, 2022
b107f80
feat(instrumentation): add InstrumentationMixIn for Head and Worker r…
Sep 22, 2022
cd17588
feat(instrumentation): disable tracing of ServerReflection and endpoi…
Sep 26, 2022
2e44270
test(instrumentation): add basic tracing and metrics tests for HTTP G…
Sep 26, 2022
a083146
test(instrumentation): move test common code for tracing and metrics …
Sep 26, 2022
30ee9e3
feat(instrumentation): enable tracing of flow internal and start up r…
Sep 26, 2022
3998e2f
test(instrumentation): move test common code to new base class
Sep 26, 2022
30409c2
test(instrumentation): test grpc gateway opentelemety instrumentation
Sep 26, 2022
e2ee862
feat(instrumentation): add Jaeger export agent and required configura…
Sep 27, 2022
a0bfaf8
chore(instrumentation): remove print statement
Sep 27, 2022
60be044
test(instrumentation): document spans in the grpc and http gateway in…
Sep 27, 2022
9da9eaf
Merge branch 'master' into feat-instrumentation-5155
Sep 27, 2022
adb96ba
style: fix overload and cli autocomplete
jina-bot Sep 27, 2022
0af8ffb
chore: remove print statement
Sep 27, 2022
a241f62
test(instrumentation): add instrumentaiton tests for websocket gateway
Sep 27, 2022
528e38b
fix: import openetelmetry api globally and the other dependencies onl…
Sep 27, 2022
47ed0a8
fix: use class name as default name when creating Executor instrument…
Sep 27, 2022
3f436da
fix: provide argparse arguments to AlternativeGateway
Sep 27, 2022
578e882
style: fix overload and cli autocomplete
jina-bot Sep 27, 2022
aa5a34a
style: fix overload and cli autocomplete
Sep 28, 2022
87c15f5
Merge branch 'master' into feat-instrumentation-5155
Sep 28, 2022
f7b4af4
style: fix overload and cli autocomplete
jina-bot Sep 28, 2022
3a2e1de
style: fix overload and cli autocomplete
Sep 28, 2022
82dad9c
style: fix overload and cli autocomplete
jina-bot Sep 28, 2022
42d00e6
fix: revert changes for Gateway implementation
Sep 29, 2022
9ade3b6
Merge branch 'master' into feat-instrumentation-5155
Sep 29, 2022
4132396
feat(instrumentation): remove init method from InstrumentationMixin
Sep 29, 2022
4efbbd7
feat(instrumentation): create vendor neutral opentelemetry export arg…
Sep 29, 2022
8e9abcb
style: fix overload and cli autocomplete
Sep 29, 2022
8eed211
feat(instrumentation): inject tracing variables from AsyncLoopRuntime…
Sep 30, 2022
175a399
style: fix overload and cli autocomplete
jina-bot Sep 30, 2022
030b980
feat(instrumentation): configure a OTLP collector for exporting trace…
Sep 30, 2022
c686498
style: fix overload and cli autocomplete
jina-bot Sep 30, 2022
6d21a3a
feat(instrumentation): return None for aio server interceptors if tra…
Oct 4, 2022
00c6c12
test: fix handling of optional args
Oct 5, 2022
92c0e1f
Merge branch 'master' into feat-instrumentation-5155
Oct 5, 2022
6e27829
fix: remove print debug statement
Oct 5, 2022
366a20e
fix: fix gateway class loading
alaeddine-13 Oct 5, 2022
822b541
Merge branch 'feat-instrumentation-5155' of github.com:jina-ai/jina i…
alaeddine-13 Oct 5, 2022
963b82d
feat(instrumentation): fix BaseGateway telemetry dependency injection
Oct 5, 2022
6433930
fix: fix WebsocketGateway loading
alaeddine-13 Oct 5, 2022
ffadb73
fix(instrumentation): correctly handle default executor runtime_args
Oct 5, 2022
3f6eeff
test(instrumentation): add integration tests for grpc, http and webso…
Oct 5, 2022
6b35909
test(instrumentation): parameterize instrumentation tests
Oct 5, 2022
2906369
test(instrumentation): remove outdated tests replaced by parametrized…
Oct 6, 2022
f1ad7a2
fix(instrumentation): fix executor instrumentation setup
Oct 6, 2022
d7bb8d9
fix(instrumentation): force spawn process when running flows in param…
Oct 6, 2022
5e31dca
feat(instrumentation): omit opentelemetry from cli args
Oct 6, 2022
c23f30a
style: fix overload and cli autocomplete
jina-bot Oct 6, 2022
bcc39a8
test: small test refactoring
JoanFM Oct 6, 2022
c540628
Merge branch 'master' into feat-instrumentation-5155
Oct 6, 2022
2ce9c67
style: fix overload and cli autocomplete
Oct 6, 2022
adcb457
style: fix overload and cli autocomplete
jina-bot Oct 6, 2022
0ae5f99
Merge branch 'master' into feat-instrumentation-5155
Oct 6, 2022
b45de43
test: dont set multiprocessing start method to spawn
Oct 6, 2022
bbd2fb8
fix: hide opentelemetry imports
Oct 6, 2022
222cfb9
Merge branch 'master' into feat-instrumentation-5155
JoanFM Oct 6, 2022
dcf7296
fix(runtimes): shutdown instrumentation exporters during teardown
Oct 7, 2022
57be55e
test: spawn processes by default in tests
Oct 7, 2022
7266abc
Merge branch 'feat-instrumentation-5155' of github.com:jina-ai/jina i…
Oct 7, 2022
e9e78ae
fix: provide client and server interceptors only when tracing is ena…
Oct 7, 2022
3656afc
Merge branch 'master' into feat-instrumentation-5155
Oct 7, 2022
4f83c47
fix(serve): correctly handle default instrumentation runtime_args
Oct 7, 2022
a9d5b1b
chore: hide opentelemetry imports under TYPE_CHECKING
Oct 7, 2022
a706480
test: avoid using spawn
JoanFM Oct 7, 2022
ef4a232
fix: add explicit type info and hide imports
Oct 7, 2022
1c0aedd
fix(executors): handle optional runtime_args correctly
Oct 7, 2022
c292234
chore: rename otel_context to tracing_context
Oct 7, 2022
01d543b
feat: use None instead of NoOp tracer and meter implementations
Oct 10, 2022
4afc51b
fix: remove unused import
Oct 10, 2022
70146e4
feat: add default tracing span for DataRequestHandler handle invocation
Oct 10, 2022
7f20c06
test: add test case to verify exception recording in a span
Oct 10, 2022
550a975
fix: use continue_on_error instead of try-except-pass
Oct 10, 2022
b644004
Merge branch 'master' into feat-instrumentation-5155
girishc13 Oct 10, 2022
d55d86c
chore: rename method name to match returning a list
Oct 11, 2022
132a932
fix: rename span_exporter args to traces_exporter
Oct 11, 2022
bb0b003
style: fix overload and cli autocomplete
jina-bot Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 12 additions & 9 deletions jina/serve/executors/__init__.py
Expand Up @@ -15,11 +15,9 @@
from jina.importer import ImportExtensions
from jina.jaml import JAML, JAMLCompatible, env_var_regex, internal_var_regex
from jina.logging.logger import JinaLogger
from jina.serve.executors.decorators import (
avoid_concurrent_lock_cls,
store_init_kwargs,
wrap_func,
)
from jina.serve.executors.decorators import avoid_concurrent_lock_cls, requests
from jina.serve.executors.metas import get_default_metas, get_executor_taboo
from jina.serve.helper import store_init_kwargs, wrap_func

if TYPE_CHECKING:
from prometheus_client import Summary
Expand Down Expand Up @@ -190,12 +188,17 @@ def _init_instrumentation(self, _runtime_args: Optional[Dict]):
else self.__class__.__name__
)

self.tracer_provider = _runtime_args.get(
'tracer_provider', trace.NoOpTracerProvider()
self.tracer_provider = (
_runtime_args['tracer_provider']
if hasattr(_runtime_args, 'tracer_provider')
else trace.NoOpTracerProvider()
)

self.tracer = self.tracer_provider.get_tracer(instrumentating_module_name)
self.meter_provider = _runtime_args.get(
'meter_provider', metrics.NoOpMeterProvider()
self.meter_provider = (
_runtime_args['meter_provider']
if hasattr(_runtime_args, 'meter_provider')
else metrics.NoOpMeterProvider()
)
self.meter = self.meter_provider.get_meter(instrumentating_module_name)

Expand Down
6 changes: 3 additions & 3 deletions jina/serve/gateway.py
@@ -1,8 +1,8 @@
import abc
import argparse
from typing import TYPE_CHECKING, Any, Optional, Sequence

from opentelemetry import metrics, trace
import functools
import inspect
from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused imports


from jina.helper import convert_tuple_to_list
from jina.jaml import JAMLCompatible
Expand Down
12 changes: 9 additions & 3 deletions jina/serve/runtimes/gateway/__init__.py
Expand Up @@ -13,9 +13,6 @@
from jina.parsers.helper import _set_gateway_uses
from jina.serve.gateway import BaseGateway
from jina.serve.runtimes.asyncio import AsyncNewLoopRuntime
from jina.serve.runtimes.gateway.grpc import GRPCGateway
from jina.serve.runtimes.gateway.http import HTTPGateway
from jina.serve.runtimes.gateway.websocket import WebSocketGateway

if TYPE_CHECKING:
import multiprocessing
Expand Down Expand Up @@ -56,6 +53,7 @@ async def async_setup(self):
raise PortAlreadyUsed(f'port:{self.args.port}')

uses_with = self.args.uses_with or {}
print(f'--->args: {self.args}')
self.gateway = BaseGateway.load_config(
self.args.uses,
uses_with=dict(
Expand All @@ -73,6 +71,8 @@ async def async_setup(self):
ssl_certfile=self.args.ssl_certfile,
uvicorn_kwargs=self.args.uvicorn_kwargs,
proxy=self.args.proxy,
opentelemetry_tracing=self.opentelemetry_tracing,
tracer_provider=self.tracer_provider,
**uses_with,
),
uses_metas={},
Expand All @@ -88,6 +88,12 @@ async def async_setup(self):
timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.args.name,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(
self.tracer
),
tracing_client_interceptor=self.tracing_client_interceptor(
self.tracer_provider
),
)
await self.gateway.setup_server()

Expand Down
57 changes: 0 additions & 57 deletions jina/serve/runtimes/gateway/grpc/__init__.py
@@ -1,58 +1 @@
from jina.serve.runtimes.gateway.grpc.gateway import GRPCGateway

__all__ = ['GRPCGatewayRuntime']


class GRPCGatewayRuntime(GatewayRuntime):
"""Gateway Runtime for gRPC."""

async def async_setup(self):
"""
The async method to setup.

Create the gRPC server and expose the port for communication.
"""
if not self.args.proxy and os.name != 'nt':
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')

if not (is_port_free(__default_host__, self.args.port)):
raise PortAlreadyUsed(f'port:{self.args.port}')

self.gateway = GRPCGateway(
name=self.name,
grpc_server_options=self.args.grpc_server_options,
grpc_tracing_server_interceptors=self.aio_tracing_server_interceptor(),
port=self.args.port,
ssl_keyfile=self.args.ssl_keyfile,
ssl_certfile=self.args.ssl_certfile,
)

self.gateway.set_streamer(
args=self.args,
timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.name,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(
self.tracer
),
tracing_client_interceptor=self.tracing_client_interceptor(
self.tracer_provider
),
)
await self.gateway.setup_server()

async def async_teardown(self):
"""Close the connection pool"""
# usually async_cancel should already have been called, but then its a noop
# if the runtime is stopped without a sigterm (e.g. as a context manager, this can happen)
await self.gateway.teardown()
await self.async_cancel()

async def async_cancel(self):
"""The async method to stop server."""
await self.gateway.stop_server()

async def async_run_forever(self):
"""The async running of server."""
await self.gateway.run_server()
72 changes: 1 addition & 71 deletions jina/serve/runtimes/gateway/http/__init__.py
@@ -1,71 +1 @@
import asyncio
import os

from jina import __default_host__
from jina.serve.runtimes.gateway import GatewayRuntime
from jina.serve.runtimes.gateway.http.app import get_fastapi_app

__all__ = ['HTTPGatewayRuntime']

from jina.serve.runtimes.gateway.http.gateway import HTTPGateway


class HTTPGatewayRuntime(GatewayRuntime):
"""Runtime for HTTP interface."""

async def async_setup(self):
"""
The async method setup the runtime.

Setup the uvicorn server.
"""
self.gateway = HTTPGateway(
name=self.name,
port=self.args.port,
title=self.args.title,
description=self.args.description,
no_debug_endpoints=self.args.no_debug_endpoints,
no_crud_endpoints=self.args.no_crud_endpoints,
expose_endpoints=self.args.expose_endpoints,
expose_graphql_endpoint=self.args.expose_graphql_endpoint,
cors=self.args.cors,
ssl_keyfile=self.args.ssl_keyfile,
ssl_certfile=self.args.ssl_certfile,
uvicorn_kwargs=self.args.uvicorn_kwargs,
opentelemetry_tracing=self.opentelemetry_tracing,
tracer_provider=self.tracer_provider,
)

self.gateway.set_streamer(
args=self.args,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this removed ? this can result in failing tests

timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.args.name,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(
self.tracer
),
tracing_client_interceptor=self.tracing_client_interceptor(
self.tracer_provider
),
)
await self.gateway.setup_server()

async def _wait_for_cancel(self):
"""Do NOT override this method when inheriting from :class:`GatewayPod`"""
# handle terminate signals
while not self.is_cancel.is_set() and not self.gateway.should_exit:
await asyncio.sleep(0.1)

await self.async_cancel()

async def async_teardown(self):
"""Shutdown the server."""
await self.gateway.teardown()

async def async_cancel(self):
"""Stop the server."""
await self.gateway.stop_server()

async def async_run_forever(self):
"""Running method of the server."""
await self.gateway.run_server()
from .gateway import HTTPGateway
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use absolute import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not my change. Using absolute import won't work. Maybe @alaeddine-13 can explain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably missed this, but I believe it's still possible, it does not produce circular imports for other gateways

JoanFM marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions jina/serve/runtimes/gateway/http/gateway.py
Expand Up @@ -26,6 +26,7 @@ def __init__(
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
uvicorn_kwargs: Optional[dict] = None,
proxy: Optional[bool] = None,
opentelemetry_tracing: Optional[bool] = None,
tracer_provider: Optional[trace.TracerProvider] = None,
**kwargs
Expand All @@ -45,6 +46,8 @@ def __init__(
:param ssl_keyfile: the path to the key file
:param ssl_certfile: the path to the certificate file
:param uvicorn_kwargs: Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
:param proxy: If set, respect the http_proxy and https_proxy environment variables, otherwise, it will unset
these proxy variables before start. gRPC seems to prefer no proxy
:param opentelemetry_tracing: Enables tracing if set to True.
:param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code.
:param kwargs: keyword args
Expand Down
59 changes: 5 additions & 54 deletions jina/serve/runtimes/gateway/websocket/__init__.py
@@ -1,57 +1,8 @@
from jina.serve.runtimes.gateway.websocket.gateway import WebSocketGateway


class WebSocketGatewayRuntime(GatewayRuntime):
"""Runtime for Websocket interface."""

async def async_setup(self):
"""
The async method setup the runtime.

Setup the uvicorn server.
"""
import asyncio

self.gateway = WebSocketGateway(
name=self.name,
port=self.args.port,
ssl_keyfile=self.args.ssl_keyfile,
ssl_certfile=self.args.ssl_certfile,
uvicorn_kwargs=self.args.uvicorn_kwargs,
logger=self.logger,
opentelemetry_tracing=self.opentelemetry_tracing,
tracer_provider=self.tracer_provider,
)
from jina.serve.runtimes.gateway import GatewayRuntime
from jina.serve.runtimes.gateway.websocket.app import get_fastapi_app

self.gateway.set_streamer(
args=self.args,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in grpc gateway runtime

timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.args.name,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(
self.tracer
),
tracing_client_interceptor=self.tracing_client_interceptor(
self.tracer_provider
),
)
await self.gateway.setup_server()
__all__ = ['WebSocketGatewayRuntime']

async def _wait_for_cancel(self):
"""Do NOT override this method when inheriting from :class:`GatewayPod`"""
# handle terminate signals
while not self.is_cancel.is_set() and not self.gateway.should_exit:
await asyncio.sleep(0.1)

await self.async_cancel()

async def async_teardown(self):
"""Shutdown the server."""
await self.gateway.teardown()

async def async_cancel(self):
"""Stop the server."""
await self.gateway.stop_server()

async def async_run_forever(self):
"""Running method of ther server."""
await self.gateway.run_server()
from jina.serve.runtimes.gateway.websocket.gateway import WebSocketGateway
3 changes: 3 additions & 0 deletions jina/serve/runtimes/gateway/websocket/gateway.py
Expand Up @@ -19,6 +19,7 @@ def __init__(
ssl_keyfile: Optional[str] = None,
ssl_certfile: Optional[str] = None,
uvicorn_kwargs: Optional[dict] = None,
proxy: Optional[bool] = None,
opentelemetry_tracing: Optional[bool] = None,
tracer_provider: Optional[trace.TracerProvider] = None,
**kwargs
Expand All @@ -28,6 +29,8 @@ def __init__(
:param ssl_keyfile: the path to the key file
:param ssl_certfile: the path to the certificate file
:param uvicorn_kwargs: Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server
:param proxy: If set, respect the http_proxy and https_proxy environment variables, otherwise, it will unset
these proxy variables before start. gRPC seems to prefer no proxy
:param opentelemetry_tracing: Enables tracing if set to True.
:param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code.
:param kwargs: keyword args
Expand Down
Expand Up @@ -39,49 +39,14 @@ async def intercept_service(self, continuation, handler_call_details):
return self._deny

class AlternativeGRPCGateway(GRPCGateway):
def __init__(self, namespace_args, *args, **kwargs):
super(AlternativeGRPCGateway, self).__init__(
args=namespace_args, *args, **kwargs
)
def __init__(self, *args, **kwargs):
super(AlternativeGRPCGateway, self).__init__(*args, **kwargs)
self.server = grpc.aio.server(
interceptors=(AuthInterceptor('access_key'),),
options=_get_grpc_server_options(self.grpc_server_options),
)

class AlternativeGRPCGatewayRuntime(_GRPCGatewayRuntime):
async def async_setup(self):
"""
The async method to setup.
Create the gRPC server and expose the port for communication.
"""
if not self.args.proxy and os.name != 'nt':
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')

if not (is_port_free(__default_host__, self.args.port)):
raise PortAlreadyUsed(f'port:{self.args.port}')

self.gateway = AlternativeGRPCGateway(
name=self.name,
namespace_args=self.args,
grpc_server_options=self.args.grpc_server_options,
port=self.args.port,
ssl_keyfile=self.args.ssl_keyfile,
ssl_certfile=self.args.ssl_certfile,
)

self.gateway.set_streamer(
timeout_send=self.timeout_send,
metrics_registry=self.metrics_registry,
runtime_name=self.name,
)
await self.gateway.setup_server()

monkeypatch.setattr(
'jina.serve.runtimes.gateway.grpc.GRPCGatewayRuntime',
AlternativeGRPCGatewayRuntime,
)
return Flow(protocol='grpc').add()
return Flow(protocol='grpc', uses=AlternativeGRPCGateway).add()


def test_client_grpc_kwargs(flow_with_grpc):
Expand Down