From 691bac34ea42f690f7a15d23ce56ff21f6bb71e4 Mon Sep 17 00:00:00 2001 From: sengjea Date: Sat, 5 Jun 2021 09:15:17 +0100 Subject: [PATCH 1/7] fix asynchonous unary call traces --- .../instrumentation/grpc/_client.py | 131 ++++++++---------- .../tests/_client.py | 7 + .../tests/test_client_interceptor.py | 15 ++ 3 files changed, 82 insertions(+), 71 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 7d3d22e714..74f2f336f2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -33,27 +33,6 @@ from opentelemetry.trace.status import Status, StatusCode -class _GuardedSpan: - def __init__(self, span): - self.span = span - self.generated_span = None - self._engaged = True - - def __enter__(self): - self.generated_span = self.span.__enter__() - return self - - def __exit__(self, *args, **kwargs): - if self._engaged: - self.generated_span = None - return self.span.__exit__(*args, **kwargs) - return False - - def release(self): - self._engaged = False - return self.span - - class _CarrierSetter(Setter): """We use a custom setter in order to be able to lower case keys as is required by grpc. @@ -68,7 +47,7 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str): def _make_future_done_callback(span, rpc_info): def callback(response_future): - with span: + with trace.use_span(span, end_on_exit=True): code = response_future.code() if code != grpc.StatusCode.OK: rpc_info.error = code @@ -94,17 +73,17 @@ def _start_span(self, method): SpanAttributes.RPC_SERVICE: service, } - return self._tracer.start_as_current_span( - name=method, kind=trace.SpanKind.CLIENT, attributes=attributes + return self._tracer.start_span( + name=method, kind=trace.SpanKind.CLIENT, attributes=attributes, ) # pylint:disable=no-self-use - def _trace_result(self, guarded_span, rpc_info, result): + def _trace_result(self, span, rpc_info, result): # If the RPC is called asynchronously, release the guard and add a # callback so that the span can be finished once the future is done. if isinstance(result, grpc.Future): result.add_done_callback( - _make_future_done_callback(guarded_span.release(), rpc_info) + _make_future_done_callback(span, rpc_info) ) return result response = result @@ -115,41 +94,43 @@ def _trace_result(self, guarded_span, rpc_info, result): if isinstance(result, tuple): response = result[0] rpc_info.response = response - + span.end() return result - def _start_guarded_span(self, *args, **kwargs): - return _GuardedSpan(self._start_span(*args, **kwargs)) - def intercept_unary(self, request, metadata, client_info, invoker): if not metadata: mutable_metadata = OrderedDict() else: mutable_metadata = OrderedDict(metadata) - - with self._start_guarded_span(client_info.full_method) as guarded_span: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) - + span = self._start_span(client_info.full_method) + with trace.use_span(span, record_exception=False, set_status_on_exception=False): try: - result = invoker(request, metadata) - except grpc.RpcError as err: - guarded_span.generated_span.set_status( - Status(StatusCode.ERROR) + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, ) - guarded_span.generated_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] - ) - raise err - return self._trace_result(guarded_span, rpc_info, result) + result = invoker(request, metadata) + except Exception as exc: + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description="{}: {}".format(type(exc).__name__, exc), + ) + ) + span.record_exception(exc) + span.end() + raise exc + return self._trace_result(span, rpc_info, result) # For RPCs that stream responses, the result can be a generator. To record # the span across the generated responses and detect any errors, we wrap @@ -162,7 +143,8 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) - with self._start_span(client_info.full_method) as span: + span = self._start_span(client_info.full_method) + with trace.use_span(span, end_on_exit=True): inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) rpc_info = RpcInfo( @@ -199,27 +181,34 @@ def intercept_stream( else: mutable_metadata = OrderedDict(metadata) - with self._start_guarded_span(client_info.full_method) as guarded_span: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request_or_iterator, - ) + span = self._start_span(client_info.full_method) + with trace.use_span(span, record_exception=False, set_status_on_exception=False): + try: + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) - rpc_info.request = request_or_iterator + rpc_info.request = request_or_iterator - try: result = invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - guarded_span.generated_span.set_status( - Status(StatusCode.ERROR) + except Exception as exc: + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description="{}: {}".format(type(exc).__name__, exc), + ) ) - guarded_span.generated_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0], - ) - raise err + span.record_exception(exc) + span.end() + raise exc - return self._trace_result(guarded_span, rpc_info, result) + return self._trace_result(span, rpc_info, result) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 43310b5f65..69222b37a4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -24,6 +24,13 @@ def simple_method(stub, error=False): stub.SimpleMethod(request) +def simple_method_future(stub, error=False): + request = Request( + client_id=CLIENT_ID, request_data="error" if error else "data" + ) + return stub.SimpleMethod.future(request) + + def client_streaming_method(stub, error=False): # create a generator def request_messages(): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 109a0d8563..0075d3873c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -36,6 +36,7 @@ client_streaming_method, server_streaming_method, simple_method, + simple_method_future, ) from ._server import create_test_server from .protobuf.test_server_pb2 import Request @@ -100,6 +101,20 @@ def tearDown(self): self.server.stop(None) self.channel.close() + def test_unary_unary_future(self): + simple_method_future(self._stub).result() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + def test_unary_unary(self): simple_method(self._stub) spans = self.memory_exporter.get_finished_spans() From 38478661336f4634dbd75c871741cc46911fa5ae Mon Sep 17 00:00:00 2001 From: sengjea Date: Wed, 16 Jun 2021 17:09:11 +0100 Subject: [PATCH 2/7] address lints --- CHANGELOG.md | 2 ++ .../opentelemetry/instrumentation/grpc/_client.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8edd0ac74b..cca9d22f09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD) +- `opentelemetry-instrumentation-grpc` Fixed asynchonous unary call traces + ([#536](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/536)) ### Changed - `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method. diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 74f2f336f2..fafd10cc4f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -103,7 +103,9 @@ def intercept_unary(self, request, metadata, client_info, invoker): else: mutable_metadata = OrderedDict(metadata) span = self._start_span(client_info.full_method) - with trace.use_span(span, record_exception=False, set_status_on_exception=False): + with trace.use_span( + span, record_exception=False, set_status_on_exception=False + ): try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -119,7 +121,8 @@ def intercept_unary(self, request, metadata, client_info, invoker): except Exception as exc: if isinstance(exc, grpc.RpcError): span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0] + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], ) span.set_status( Status( @@ -182,7 +185,9 @@ def intercept_stream( mutable_metadata = OrderedDict(metadata) span = self._start_span(client_info.full_method) - with trace.use_span(span, record_exception=False, set_status_on_exception=False): + with trace.use_span( + span, record_exception=False, set_status_on_exception=False + ): try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -199,7 +204,8 @@ def intercept_stream( except Exception as exc: if isinstance(exc, grpc.RpcError): span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, exc.code().value[0] + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], ) span.set_status( Status( From cf299c22e403a7f43ec1a445ad1a90b46acd494f Mon Sep 17 00:00:00 2001 From: sengjea Date: Wed, 16 Jun 2021 17:09:11 +0100 Subject: [PATCH 3/7] addressing review --- .../instrumentation/grpc/_client.py | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index fafd10cc4f..4655041423 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -64,7 +64,7 @@ class OpenTelemetryClientInterceptor( def __init__(self, tracer): self._tracer = tracer - def _start_span(self, method): + def _start_span(self, method, **kwargs): service, meth = method.lstrip("/").split("/", 1) attributes = { SpanAttributes.RPC_SYSTEM: "grpc", @@ -73,14 +73,17 @@ def _start_span(self, method): SpanAttributes.RPC_SERVICE: service, } - return self._tracer.start_span( - name=method, kind=trace.SpanKind.CLIENT, attributes=attributes, + return self._tracer.start_as_current_span( + name=method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + **kwargs, ) # pylint:disable=no-self-use def _trace_result(self, span, rpc_info, result): - # If the RPC is called asynchronously, release the guard and add a - # callback so that the span can be finished once the future is done. + # If the RPC is called asynchronously, add a callback to end the span + # when the future is done, else end the span immediately if isinstance(result, grpc.Future): result.add_done_callback( _make_future_done_callback(span, rpc_info) @@ -102,10 +105,13 @@ def intercept_unary(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict() else: mutable_metadata = OrderedDict(metadata) - span = self._start_span(client_info.full_method) - with trace.use_span( - span, record_exception=False, set_status_on_exception=False - ): + with self._start_span( + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + result = None try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -131,8 +137,10 @@ def intercept_unary(self, request, metadata, client_info, invoker): ) ) span.record_exception(exc) - span.end() raise exc + finally: + if not result: + span.end() return self._trace_result(span, rpc_info, result) # For RPCs that stream responses, the result can be a generator. To record @@ -146,8 +154,7 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) - span = self._start_span(client_info.full_method) - with trace.use_span(span, end_on_exit=True): + with self._start_span(client_info.full_method) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) rpc_info = RpcInfo( @@ -184,10 +191,13 @@ def intercept_stream( else: mutable_metadata = OrderedDict(metadata) - span = self._start_span(client_info.full_method) - with trace.use_span( - span, record_exception=False, set_status_on_exception=False - ): + with self._start_span( + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + result = None try: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -214,7 +224,9 @@ def intercept_stream( ) ) span.record_exception(exc) - span.end() raise exc + finally: + if not result: + span.end() return self._trace_result(span, rpc_info, result) From d738ce7f570955fc9f3dc7b8f06440b04ec604ce Mon Sep 17 00:00:00 2001 From: sengjea Date: Fri, 25 Jun 2021 15:25:45 +0100 Subject: [PATCH 4/7] refactor duplicate code --- .../instrumentation/grpc/_client.py | 47 +------------------ 1 file changed, 2 insertions(+), 45 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 4655041423..56c771e2ce 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -100,7 +100,7 @@ def _trace_result(self, span, rpc_info, result): span.end() return result - def intercept_unary(self, request, metadata, client_info, invoker): + def _intercept(self, request, metadata, client_info, invoker): if not metadata: mutable_metadata = OrderedDict() else: @@ -186,47 +186,4 @@ def intercept_stream( request_or_iterator, metadata, client_info, invoker ) - if not metadata: - mutable_metadata = OrderedDict() - else: - mutable_metadata = OrderedDict(metadata) - - with self._start_span( - client_info.full_method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, - ) as span: - result = None - try: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request_or_iterator, - ) - - rpc_info.request = request_or_iterator - - result = invoker(request_or_iterator, metadata) - except Exception as exc: - if isinstance(exc, grpc.RpcError): - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description="{}: {}".format(type(exc).__name__, exc), - ) - ) - span.record_exception(exc) - raise exc - finally: - if not result: - span.end() - - return self._trace_result(span, rpc_info, result) + return self._intercept(request_or_iterator, metadata, client_info, invoker) From 7aacaa412500b87208c5ce61ac131070cfd30c09 Mon Sep 17 00:00:00 2001 From: sengjea Date: Sun, 27 Jun 2021 11:50:29 +0100 Subject: [PATCH 5/7] add missing function in refac --- .../src/opentelemetry/instrumentation/grpc/_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 56c771e2ce..370b85bb13 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -143,6 +143,9 @@ def _intercept(self, request, metadata, client_info, invoker): span.end() return self._trace_result(span, rpc_info, result) + def intercept_unary(self, request, metadata, client_info, invoker): + return self._intercept(request, metadata, client_info, invoker) + # For RPCs that stream responses, the result can be a generator. To record # the span across the generated responses and detect any errors, we wrap # the result in a new generator that yields the response values. From 7ff649456d56c861d700c9eda6d04c4d76ac7175 Mon Sep 17 00:00:00 2001 From: sengjea Date: Tue, 29 Jun 2021 09:46:10 +0100 Subject: [PATCH 6/7] tox -e generate --- .../src/opentelemetry/instrumentation/grpc/_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 370b85bb13..a2603bb6be 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -189,4 +189,6 @@ def intercept_stream( request_or_iterator, metadata, client_info, invoker ) - return self._intercept(request_or_iterator, metadata, client_info, invoker) + return self._intercept( + request_or_iterator, metadata, client_info, invoker + ) From 761374b1dd3223daf37968f686064116101c1726 Mon Sep 17 00:00:00 2001 From: sengjea Date: Mon, 5 Jul 2021 13:31:34 +0100 Subject: [PATCH 7/7] reinsert missing changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cd9e7170d..1203c8702f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD) +- Include Flask 2.0 as compatible with existing flask instrumentation + ([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545)) ### Changed - `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method.