Skip to content

Commit

Permalink
Add grpc.aio support (#1245)
Browse files Browse the repository at this point in the history
  • Loading branch information
cookiefission committed Oct 31, 2022
1 parent 38d384a commit f58d16b
Show file tree
Hide file tree
Showing 12 changed files with 1,949 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1413](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1413))
- `opentelemetry-instrumentation-pyramid` Add support for regular expression matching and sanitization of HTTP headers.
([#1414](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1414))
- `opentelemetry-instrumentation-grpc` Add support for grpc.aio Clients and Servers
([#1245](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1245))
- Add metric exporter for Prometheus Remote Write
([#1359](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1359))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def serve():
logging.basicConfig()
serve()
You can also add the instrumentor manually, rather than using
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:
.. code-block:: python
Expand All @@ -118,6 +118,117 @@ def serve():
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])
Usage Aio Client
----------------
.. code-block:: python
import logging
import asyncio
import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
grpc_client_instrumentor = GrpcAioInstrumentorClient()
grpc_client_instrumentor.instrument()
async def run():
with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name="YOU"))
print("Greeter client received: " + response.message)
if __name__ == "__main__":
logging.basicConfig()
asyncio.run(run())
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorClient`:
.. code-block:: python
from opentelemetry.instrumentation.grpc import aio_client_interceptors
channel = grpc.aio.insecure_channel("localhost:12345", interceptors=aio_client_interceptors())
Usage Aio Server
----------------
.. code-block:: python
import logging
import asyncio
import grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)
try:
from .gen import helloworld_pb2, helloworld_pb2_grpc
except ImportError:
from gen import helloworld_pb2, helloworld_pb2_grpc
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
grpc_server_instrumentor = GrpcAioInstrumentorServer()
grpc_server_instrumentor.instrument()
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
async def serve():
server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig()
asyncio.run(serve())
You can also add the interceptor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcAioInstrumentorServer`:
.. code-block:: python
from opentelemetry.instrumentation.grpc import aio_server_interceptor
server = grpc.aio.server(interceptors = [aio_server_interceptor()])
Filters
-------
Expand Down Expand Up @@ -244,6 +355,58 @@ def _uninstrument(self, **kwargs):
grpc.server = self._original_func


class GrpcAioInstrumentorServer(BaseInstrumentor):
"""
Globally instrument the grpc.aio server.
Usage::
grpc_aio_server_instrumentor = GrpcAioInstrumentorServer()
grpc_aio_server_instrumentor.instrument()
"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
self._original_func = grpc.aio.server
tracer_provider = kwargs.get("tracer_provider")

def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(
0,
aio_server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
),
)
else:
kwargs["interceptors"] = [
aio_server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
)
]
return self._original_func(*args, **kwargs)

grpc.aio.server = server

def _uninstrument(self, **kwargs):
grpc.aio.server = self._original_func


class GrpcInstrumentorClient(BaseInstrumentor):
"""
Globally instrument the grpc client
Expand Down Expand Up @@ -315,6 +478,69 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
)


class GrpcAioInstrumentorClient(BaseInstrumentor):
"""
Globally instrument the grpc.aio client.
Usage::
grpc_aio_client_instrumentor = GrpcAioInstrumentorClient()
grpc_aio_client_instrumentor.instrument()
"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _add_interceptors(self, tracer_provider, kwargs):
if "interceptors" in kwargs and kwargs["interceptors"]:
kwargs["interceptors"] = (
aio_client_interceptors(
tracer_provider=tracer_provider, filter_=self._filter
)
+ kwargs["interceptors"]
)
else:
kwargs["interceptors"] = aio_client_interceptors(
tracer_provider=tracer_provider, filter_=self._filter
)

return kwargs

def _instrument(self, **kwargs):
self._original_insecure = grpc.aio.insecure_channel
self._original_secure = grpc.aio.secure_channel
tracer_provider = kwargs.get("tracer_provider")

def insecure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

return self._original_insecure(*args, **kwargs)

def secure(*args, **kwargs):
kwargs = self._add_interceptors(tracer_provider, kwargs)

return self._original_secure(*args, **kwargs)

grpc.aio.insecure_channel = insecure
grpc.aio.secure_channel = secure

def _uninstrument(self, **kwargs):
grpc.aio.insecure_channel = self._original_insecure
grpc.aio.secure_channel = self._original_secure


def client_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC client channel interceptor.
Expand Down Expand Up @@ -355,6 +581,45 @@ def server_interceptor(tracer_provider=None, filter_=None):
return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)


def aio_client_interceptors(tracer_provider=None, filter_=None):
"""Create a gRPC client channel interceptor.
Args:
tracer: The tracer to use to create client-side spans.
Returns:
An invocation-side interceptor object.
"""
from . import _aio_client

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return [
_aio_client.UnaryUnaryAioClientInterceptor(tracer, filter_=filter_),
_aio_client.UnaryStreamAioClientInterceptor(tracer, filter_=filter_),
_aio_client.StreamUnaryAioClientInterceptor(tracer, filter_=filter_),
_aio_client.StreamStreamAioClientInterceptor(tracer, filter_=filter_),
]


def aio_server_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC aio server interceptor.
Args:
tracer: The tracer to use to create server-side spans.
Returns:
A service-side interceptor object.
"""
from . import _aio_server

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _aio_server.OpenTelemetryAioServerInterceptor(
tracer, filter_=filter_
)


def _excluded_service_filter() -> Union[Callable[[object], bool], None]:
services = _parse_services(
os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")
Expand Down
Loading

0 comments on commit f58d16b

Please sign in to comment.