From a7aa662fe5bdb22ba0ae91158a49634f9cfeacc4 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 10 Dec 2020 13:44:26 -0500 Subject: [PATCH] Add more tests, fix compliance to semantics (#236) --- CHANGELOG.md | 4 + .../instrumentation/grpc/_server.py | 51 ++++-- .../tests/test_server_interceptor.py | 169 ++++++++++++++++-- 3 files changed, 190 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4408daae9..c1b98b072b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,10 +25,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#237](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/237)) - Add Prometheus Remote Write Exporter integration tests in opentelemetry-docker-tests ([#216](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/216)) +- `opentelemetry-instrumentation-grpc` Add tests for grpc span attributes, grpc `abort()` conditions + ([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236)) ### Changed - `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-wsgi` Return `None` for `CarrierGetter` if key not found ([#1374](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/233)) +- `opentelemetry-instrumentation-grpc` Comply with updated spec, rework tests + ([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236)) ## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26 diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 087cf4f9cc..3fe859f574 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -113,9 +113,12 @@ def set_trailing_metadata(self, *args, **kwargs): def abort(self, code, details): self.code = code self.details = details - self._active_span.set_attribute("rpc.grpc.status_code", code.name) + self._active_span.set_attribute("rpc.grpc.status_code", code.value[0]) self._active_span.set_status( - Status(status_code=StatusCode.ERROR, description=details) + Status( + status_code=StatusCode.ERROR, + description="{}:{}".format(code, details), + ) ) return self._servicer_context.abort(code, details) @@ -126,17 +129,25 @@ def set_code(self, code): self.code = code # use details if we already have it, otherwise the status description details = self.details or code.value[1] - self._active_span.set_attribute("rpc.grpc.status_code", code.name) - self._active_span.set_status( - Status(status_code=StatusCode.ERROR, description=details) - ) + self._active_span.set_attribute("rpc.grpc.status_code", code.value[0]) + if code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description="{}:{}".format(code, details), + ) + ) return self._servicer_context.set_code(code) def set_details(self, details): self.details = details - self._active_span.set_status( - Status(status_code=StatusCode.ERROR, description=details) - ) + if self.code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description="{}:{}".format(self.code, details), + ) + ) return self._servicer_context.set_details(details) @@ -181,12 +192,20 @@ def _set_remote_context(self, servicer_context): def _start_span(self, handler_call_details, context): + # standard attributes attributes = { - "rpc.method": handler_call_details.method, "rpc.system": "grpc", - "rpc.grpc.status_code": grpc.StatusCode.OK, + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], } + # if we have details about the call, split into service and method + if handler_call_details.method: + service, method = handler_call_details.method.lstrip("/").split( + "/", 1 + ) + attributes.update({"rpc.method": method, "rpc.service": service}) + + # add some attributes from the metadata metadata = dict(context.invocation_metadata()) if "user-agent" in metadata: attributes["rpc.user_agent"] = metadata["user-agent"] @@ -198,15 +217,15 @@ def _start_span(self, handler_call_details, context): # * ipv4:10.2.1.1:57284,127.0.0.1:57284 # try: - host, port = ( + ip, port = ( context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1) ) + attributes.update({"net.peer.ip": ip, "net.peer.port": port}) - # other telemetry sources convert this, so we will too - if host in ("[::1]", "127.0.0.1"): - host = "localhost" + # other telemetry sources add this, so we will too + if ip in ("[::1]", "127.0.0.1"): + attributes["net.peer.name"] = "localhost" - attributes.update({"net.peer.name": host, "net.peer.port": port}) except IndexError: logger.warning("Failed to parse peer address '%s'", context.peer()) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 13b535d841..cb61043c15 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -28,6 +28,7 @@ ) from opentelemetry.sdk import trace as trace_sdk from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.status import StatusCode class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): @@ -67,20 +68,37 @@ def handler(request, context): port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel("localhost:{:d}".format(port)) + rpc_call = "TestServicer/handler" try: server.start() - channel.unary_unary("test")(b"test") + channel.unary_unary(rpc_call)(b"test") finally: server.stop(None) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "test") + self.assertEqual(span.name, rpc_call) self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info self.check_span_instrumentation_info( span, opentelemetry.instrumentation.grpc ) + + # Check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "handler", + "rpc.service": "TestServicer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + grpc_server_instrumentor.uninstrument() def test_uninstrument(self): @@ -100,9 +118,10 @@ def handler(request, context): port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel("localhost:{:d}".format(port)) + rpc_call = "TestServicer/test" try: server.start() - channel.unary_unary("test")(b"test") + channel.unary_unary(rpc_call)(b"test") finally: server.stop(None) @@ -130,9 +149,10 @@ def handler(request, context): port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel("localhost:{:d}".format(port)) + rpc_call = "TestServicer/handler" try: server.start() - channel.unary_unary("")(b"") + channel.unary_unary(rpc_call)(b"") finally: server.stop(None) @@ -140,7 +160,7 @@ def handler(request, context): self.assertEqual(len(spans_list), 1) span = spans_list[0] - self.assertEqual(span.name, "") + self.assertEqual(span.name, rpc_call) self.assertIs(span.kind, trace.SpanKind.SERVER) # Check version and name in span's instrumentation info @@ -148,6 +168,19 @@ def handler(request, context): span, opentelemetry.instrumentation.grpc ) + # Check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "handler", + "rpc.service": "TestServicer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + def test_span_lifetime(self): """Check that the span is active for the duration of the call.""" @@ -174,7 +207,7 @@ def handler(request, context): active_span_before_call = trace.get_current_span() try: server.start() - channel.unary_unary("")(b"") + channel.unary_unary("TestServicer/handler")(b"") finally: server.stop(None) active_span_after_call = trace.get_current_span() @@ -208,20 +241,34 @@ def handler(request, context): try: server.start() - channel.unary_unary("")(b"") - channel.unary_unary("")(b"") + channel.unary_unary("TestServicer/handler")(b"") + channel.unary_unary("TestServicer/handler")(b"") finally: server.stop(None) self.assertEqual(len(active_spans_in_handler), 2) # pylint:disable=unbalanced-tuple-unpacking span1, span2 = active_spans_in_handler - # Spans should belong to separate traces, and each should be a root - # span + # Spans should belong to separate traces self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) - self.assertIsNone(span1.parent) - self.assertIsNone(span1.parent) + + for span in (span1, span2): + # each should be a root span + self.assertIsNone(span2.parent) + + # check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "handler", + "rpc.service": "TestServicer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) def test_concurrent_server_spans(self): """Check that concurrent RPC calls don't interfere with each other. @@ -258,8 +305,12 @@ def handler(request, context): # Interleave calls so spans are active on each thread at the same # time with futures.ThreadPoolExecutor(max_workers=2) as tpe: - f1 = tpe.submit(channel.unary_unary(""), b"") - f2 = tpe.submit(channel.unary_unary(""), b"") + f1 = tpe.submit( + channel.unary_unary("TestServicer/handler"), b"" + ) + f2 = tpe.submit( + channel.unary_unary("TestServicer/handler"), b"" + ) futures.wait((f1, f2)) finally: server.stop(None) @@ -267,12 +318,94 @@ def handler(request, context): self.assertEqual(len(active_spans_in_handler), 2) # pylint:disable=unbalanced-tuple-unpacking span1, span2 = active_spans_in_handler - # Spans should belong to separate traces, and each should be a root - # span + # Spans should belong to separate traces self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) - self.assertIsNone(span1.parent) - self.assertIsNone(span1.parent) + + for span in (span1, span2): + # each should be a root span + self.assertIsNone(span2.parent) + + # check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "handler", + "rpc.service": "TestServicer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + def test_abort(self): + """Check that we can catch an abort properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed failure message + failure_message = "This is a test failure" + + # aborting RPC handler + def handler(request, context): + context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + rpc_call = "TestServicer/handler" + + server.start() + # unfortunately, these are just bare exceptions in grpc... + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(b"") + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + "{}:{}".format( + grpc.StatusCode.FAILED_PRECONDITION, failure_message + ), + ) + + # Check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "handler", + "rpc.service": "TestServicer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) def get_latch(num):