From 759b52b1b3aa9404625ceabb9d50c179730642db Mon Sep 17 00:00:00 2001 From: Mihir Gore Date: Wed, 14 Apr 2021 10:04:48 +0530 Subject: [PATCH] Keep client interceptors in sync with grpc client interceptors --- .../grpc/grpcext/_interceptor.py | 130 +++++++++++++++--- .../tests/test_client_interceptor.py | 42 ++++++ 2 files changed, 156 insertions(+), 16 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py index 89889aceeb..d52dd30d4a 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -48,9 +48,24 @@ def __init__(self, method, base_callable, interceptor): self._base_callable = base_callable self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): - return self._base_callable(request, timeout, metadata, credentials) + return self._base_callable( + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) client_info = _UnaryClientInfo(self._method, timeout) return self._interceptor.intercept_unary( @@ -58,11 +73,22 @@ def invoker(request, metadata): ) def with_call( - self, request, timeout=None, metadata=None, credentials=None + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request, metadata): return self._base_callable.with_call( - request, timeout, metadata, credentials + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _UnaryClientInfo(self._method, timeout) @@ -70,10 +96,23 @@ def invoker(request, metadata): request, metadata, client_info, invoker ) - def future(self, request, timeout=None, metadata=None, credentials=None): + def future( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): return self._base_callable.future( - request, timeout, metadata, credentials + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _UnaryClientInfo(self._method, timeout) @@ -88,9 +127,24 @@ def __init__(self, method, base_callable, interceptor): self._base_callable = base_callable self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): - return self._base_callable(request, timeout, metadata, credentials) + return self._base_callable( + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) client_info = _StreamClientInfo(self._method, False, True, timeout) return self._interceptor.intercept_stream( @@ -105,11 +159,22 @@ def __init__(self, method, base_callable, interceptor): self._interceptor = interceptor def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -118,11 +183,22 @@ def invoker(request_iterator, metadata): ) def with_call( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable.with_call( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -131,11 +207,22 @@ def invoker(request_iterator, metadata): ) def future( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable.future( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -151,11 +238,22 @@ def __init__(self, method, base_callable, interceptor): self._interceptor = interceptor def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, True, timeout) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index c9466b631e..0f430648e2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -40,13 +40,55 @@ from .protobuf.test_server_pb2 import Request +class Interceptor( + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, +): + def __init__(self): + pass + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + @staticmethod + def _intercept_call( + continuation, client_call_details, request_or_iterator + ): + return continuation(client_call_details, request_or_iterator) + + class TestClientProto(TestBase): def setUp(self): super().setUp() GrpcInstrumentorClient().instrument() self.server = create_test_server(25565) self.server.start() + interceptors = [Interceptor()] self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) def tearDown(self):