Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grpc.aio support #1245

Merged
merged 29 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
03e021b
Add support for service-side grpc.aio
cookiefission Aug 24, 2022
ff4403c
Add grpc.aio client-side support
cookiefission Aug 30, 2022
a955218
Fix assorted CI issues
cookiefission Sep 2, 2022
74c857c
Add changelog entry
cookiefission Sep 2, 2022
8cd1c9a
Merge branch 'main' into grpc-aio-support
cookiefission Sep 21, 2022
7802001
Fix conflict properly
cookiefission Sep 21, 2022
c64b884
Merge branch 'main' into grpc-aio-support
cookiefission Sep 21, 2022
a42dd5d
Merge branch 'main' into grpc-aio-support
ocelotl Sep 27, 2022
1098e64
Merge branch 'main' into grpc-aio-support
srikanthccv Oct 5, 2022
9cc91b4
Skip grpc.aio tests in Python 3.7
cookiefission Sep 21, 2022
6c1d0a7
Update filters to handle grpc.aio.ClientCallDetails
cookiefission Oct 7, 2022
cf01d06
Update grpc.aio client interceptor to handle grpc bug
cookiefission Oct 7, 2022
600ca83
Add filter support to grpc.aio client interceptor
cookiefission Oct 10, 2022
c4c2e86
Merge branch 'main' into grpc-aio-support
cookiefission Oct 10, 2022
6622774
Add basic server filtering test
cookiefission Oct 10, 2022
db4f86e
Format code with black
cookiefission Oct 10, 2022
5de1a4b
Re-order imports with isort
cookiefission Oct 10, 2022
ac546be
Merge branch 'open-telemetry:main' into grpc-aio-support
cookiefission Oct 12, 2022
4824822
Add additional tests for aio server filtering
cookiefission Oct 12, 2022
94ba2f7
Reformat code to pass linting
cookiefission Oct 13, 2022
028a64e
Merge branch 'main' into grpc-aio-support
cookiefission Oct 13, 2022
c9f2a60
Merge branch 'main' into grpc-aio-support
cookiefission Oct 16, 2022
0935e6a
Merge branch 'main' into grpc-aio-support
cookiefission Oct 20, 2022
e1304ed
Merge branch 'main' into grpc-aio-support
cookiefission Oct 24, 2022
e30a21b
Merge branch 'main' into grpc-aio-support
cookiefission Oct 25, 2022
274a6f6
Merge branch 'main' into grpc-aio-support
srikanthccv Oct 29, 2022
fe3cb18
Update CHANGELOG.md
cookiefission Oct 31, 2022
452360d
Merge branch 'main' into grpc-aio-support
srikanthccv Oct 31, 2022
5a6060d
Merge branch 'main' into grpc-aio-support
srikanthccv Oct 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def serve():
logging.basicConfig()
serve()

You can also add the instrumentor manually, rather than using
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:

.. code-block:: python
Expand All @@ -118,6 +118,117 @@ def serve():
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])

Usage Aio Client
----------------
.. code-block:: python

import logging
import asyncio

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)

try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_client_instrumentor = GrpcAioInstrumentorClient()
grpc_client_instrumentor.instrument()

async def run():
with grpc.aio.insecure_channel("localhost:50051") as channel:

stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))

print("Greeter client received: " + response.message)


if __name__ == "__main__":
logging.basicConfig()
asyncio.run(run())

You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient`:

.. code-block:: python

from opentelemetry.instrumentation.grpc import aio_client_interceptors

channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors())


Usage Aio Server
----------------
.. code-block:: python

import logging
import asyncio

import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)

try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)

grpc_server_instrumentor = GrpcAioInstrumentorServer()
grpc_server_instrumentor.instrument()

class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


async def serve():

server = grpc.aio.server()

helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()


if __name__ == "__main__":
logging.basicConfig()
asyncio.run(serve())

You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`:

.. code-block:: python

from opentelemetry.instrumentation.grpc import aio_server_interceptor

server = grpc.aio.server(interceptors = [aio_server_interceptor()])

Filters
-------

Expand Down Expand Up @@ -244,6 +355,58 @@ def _uninstrument(self, **kwargs):
grpc.server = self._original_func


class GrpcAioInstrumentorServer(BaseInstrumentor):
"""
Globally instrument the grpc.aio server.

Usage::

grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
grpc_aio_server_instrumentor.instrument()

"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
self._original_func = grpc.aio.server
tracer_provider = kwargs.get("tracer_provider")

def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(
0,
aio_server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
),
)
else:
kwargs["interceptors"] = [
aio_server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
)
]
return self._original_func(*args, **kwargs)

grpc.aio.server = server

def _uninstrument(self, **kwargs):
grpc.aio.server = self._original_func


class GrpcInstrumentorClient(BaseInstrumentor):
"""
Globally instrument the grpc client
Expand Down Expand Up @@ -315,6 +478,69 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
)


class GrpcAioInstrumentorClient(BaseInstrumentor):
"""
Globally instrument the grpc.aio client.

Usage::

grpc_aio_client_instrumentor = GrpcAioInstrumentorClient()
grpc_aio_client_instrumentor.instrument()

"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _add_interceptors(self, tracer_provider, kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add parameter types?

if "interceptors" in kwargs and kwargs["interceptors"]:
kwargs["interceptors"] = (
aio_client_interceptors(
tracer_provider=tracer_provider, filter_=self._filter
)
+ kwargs["interceptors"]
)
else:
kwargs["interceptors"] = aio_client_interceptors(
tracer_provider=tracer_provider, filter_=self._filter
)

return kwargs

def _instrument(self, **kwargs):
self._original_insecure = grpc.aio.insecure_channel
self._original_secure = grpc.aio.secure_channel
tracer_provider = kwargs.get("tracer_provider")

def insecure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

return self._original_insecure(*args, **kwargs)

def secure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

return self._original_secure(*args, **kwargs)

grpc.aio.insecure_channel = insecure
grpc.aio.secure_channel = secure

def _uninstrument(self, **kwargs):
grpc.aio.insecure_channel = self._original_insecure
grpc.aio.secure_channel = self._original_secure


def client_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC client channel interceptor.

Expand Down Expand Up @@ -355,6 +581,45 @@ def server_interceptor(tracer_provider=None, filter_=None):
return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)


def aio_client_interceptors(tracer_provider=None, filter_=None):
"""Create a gRPC client channel interceptor.

Args:
tracer: The tracer to use to create client-side spans.

Returns:
An invocation-side interceptor object.
"""
from . import _aio_client

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return [
_aio_client.UnaryUnaryAioClientInterceptor(tracer, filter_=filter_),
_aio_client.UnaryStreamAioClientInterceptor(tracer, filter_=filter_),
_aio_client.StreamUnaryAioClientInterceptor(tracer, filter_=filter_),
_aio_client.StreamStreamAioClientInterceptor(tracer, filter_=filter_),
]


def aio_server_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC aio server interceptor.

Args:
tracer: The tracer to use to create server-side spans.

Returns:
A service-side interceptor object.
"""
from . import _aio_server

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _aio_server.OpenTelemetryAioServerInterceptor(
tracer, filter_=filter_
)


def _excluded_service_filter() -> Union[Callable[[object], bool], None]:
services = _parse_services(
os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")
Expand Down
Loading