diff --git a/CHANGELOG.md b/CHANGELOG.md index b4e5053138..5a8abe921a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,7 +64,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436)) - `opentelemetry-instrumenation-flask` now supports trace response headers. ([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436)) - +- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors. + ([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442)) + ### Removed - Remove `http.status_text` from span attributes ([#406](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/406)) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py index 89889aceeb..53ee46a20d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py @@ -13,9 +13,7 @@ # limitations under the License. # pylint:disable=relative-beyond-top-level -# pylint:disable=arguments-differ # pylint:disable=no-member -# pylint:disable=signature-differs """Implementation of gRPC Python interceptors.""" @@ -48,9 +46,24 @@ def __init__(self, method, base_callable, interceptor): self._base_callable = base_callable self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): - return self._base_callable(request, timeout, metadata, credentials) + return self._base_callable( + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) client_info = _UnaryClientInfo(self._method, timeout) return self._interceptor.intercept_unary( @@ -58,11 +71,22 @@ def invoker(request, metadata): ) def with_call( - self, request, timeout=None, metadata=None, credentials=None + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request, metadata): return self._base_callable.with_call( - request, timeout, metadata, credentials + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _UnaryClientInfo(self._method, timeout) @@ -70,10 +94,23 @@ def invoker(request, metadata): request, metadata, client_info, invoker ) - def future(self, request, timeout=None, metadata=None, credentials=None): + def future( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): return self._base_callable.future( - request, timeout, metadata, credentials + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _UnaryClientInfo(self._method, timeout) @@ -88,9 +125,24 @@ def __init__(self, method, base_callable, interceptor): self._base_callable = base_callable self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__( + self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, + ): def invoker(request, metadata): - return self._base_callable(request, timeout, metadata, credentials) + return self._base_callable( + request, + timeout, + metadata, + credentials, + wait_for_ready, + compression, + ) client_info = _StreamClientInfo(self._method, False, True, timeout) return self._interceptor.intercept_stream( @@ -105,11 +157,22 @@ def __init__(self, method, base_callable, interceptor): self._interceptor = interceptor def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -118,11 +181,22 @@ def invoker(request_iterator, metadata): ) def with_call( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable.with_call( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -131,11 +205,22 @@ def invoker(request_iterator, metadata): ) def future( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable.future( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, False, timeout) @@ -151,11 +236,22 @@ def __init__(self, method, base_callable, interceptor): self._interceptor = interceptor def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None + self, + request_iterator, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None, + compression=None, ): def invoker(request_iterator, metadata): return self._base_callable( - request_iterator, timeout, metadata, credentials + request_iterator, + timeout, + metadata, + credentials, + wait_for_ready, + compression, ) client_info = _StreamClientInfo(self._method, True, True, timeout) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index f088f5cf8c..109a0d8563 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -41,13 +41,57 @@ from .protobuf.test_server_pb2 import Request +# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor. +class Interceptor( + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, +): + def __init__(self): + pass + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_unary_stream( + self, continuation, client_call_details, request + ): + return self._intercept_call(continuation, client_call_details, request) + + def intercept_stream_unary( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + def intercept_stream_stream( + self, continuation, client_call_details, request_iterator + ): + return self._intercept_call( + continuation, client_call_details, request_iterator + ) + + @staticmethod + def _intercept_call( + continuation, client_call_details, request_or_iterator + ): + return continuation(client_call_details, request_or_iterator) + + class TestClientProto(TestBase): def setUp(self): super().setUp() GrpcInstrumentorClient().instrument() self.server = create_test_server(25565) self.server.start() + # use a user defined interceptor along with the opentelemetry client interceptor + interceptors = [Interceptor()] self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) def tearDown(self):