Skip to content

Commit

Permalink
Add more tests, fix compliance to semantics (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
alertedsnake committed Dec 10, 2020
1 parent 6514f37 commit a7aa662
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#237](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/237))
- Add Prometheus Remote Write Exporter integration tests in opentelemetry-docker-tests
([#216](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/216))
- `opentelemetry-instrumentation-grpc` Add tests for grpc span attributes, grpc `abort()` conditions
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))

### Changed
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-wsgi` Return `None` for `CarrierGetter` if key not found
([#1374](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/233))
- `opentelemetry-instrumentation-grpc` Comply with updated spec, rework tests
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))

## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ def set_trailing_metadata(self, *args, **kwargs):
def abort(self, code, details):
self.code = code
self.details = details
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_attribute("rpc.grpc.status_code", code.value[0])
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(code, details),
)
)
return self._servicer_context.abort(code, details)

Expand All @@ -126,17 +129,25 @@ def set_code(self, code):
self.code = code
# use details if we already have it, otherwise the status description
details = self.details or code.value[1]
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
self._active_span.set_attribute("rpc.grpc.status_code", code.value[0])
if code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(code, details),
)
)
return self._servicer_context.set_code(code)

def set_details(self, details):
self.details = details
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
if self.code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(self.code, details),
)
)
return self._servicer_context.set_details(details)


Expand Down Expand Up @@ -181,12 +192,20 @@ def _set_remote_context(self, servicer_context):

def _start_span(self, handler_call_details, context):

# standard attributes
attributes = {
"rpc.method": handler_call_details.method,
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK,
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
}

# if we have details about the call, split into service and method
if handler_call_details.method:
service, method = handler_call_details.method.lstrip("/").split(
"/", 1
)
attributes.update({"rpc.method": method, "rpc.service": service})

# add some attributes from the metadata
metadata = dict(context.invocation_metadata())
if "user-agent" in metadata:
attributes["rpc.user_agent"] = metadata["user-agent"]
Expand All @@ -198,15 +217,15 @@ def _start_span(self, handler_call_details, context):
# * ipv4:10.2.1.1:57284,127.0.0.1:57284
#
try:
host, port = (
ip, port = (
context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1)
)
attributes.update({"net.peer.ip": ip, "net.peer.port": port})

# other telemetry sources convert this, so we will too
if host in ("[::1]", "127.0.0.1"):
host = "localhost"
# other telemetry sources add this, so we will too
if ip in ("[::1]", "127.0.0.1"):
attributes["net.peer.name"] = "localhost"

attributes.update({"net.peer.name": host, "net.peer.port": port})
except IndexError:
logger.warning("Failed to parse peer address '%s'", context.peer())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode


class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
Expand Down Expand Up @@ -67,20 +68,37 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
try:
server.start()
channel.unary_unary("test")(b"test")
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "test")
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

grpc_server_instrumentor.uninstrument()

def test_uninstrument(self):
Expand All @@ -100,9 +118,10 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/test"
try:
server.start()
channel.unary_unary("test")(b"test")
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)

Expand Down Expand Up @@ -130,24 +149,38 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary(rpc_call)(b"")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, "")
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_span_lifetime(self):
"""Check that the span is active for the duration of the call."""

Expand All @@ -174,7 +207,7 @@ def handler(request, context):
active_span_before_call = trace.get_current_span()
try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary("TestServicer/handler")(b"")
finally:
server.stop(None)
active_span_after_call = trace.get_current_span()
Expand Down Expand Up @@ -208,20 +241,34 @@ def handler(request, context):

try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary("")(b"")
channel.unary_unary("TestServicer/handler")(b"")
channel.unary_unary("TestServicer/handler")(b"")
finally:
server.stop(None)

self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
# Spans should belong to separate traces
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)

for span in (span1, span2):
# each should be a root span
self.assertIsNone(span2.parent)

# check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_concurrent_server_spans(self):
"""Check that concurrent RPC calls don't interfere with each other.
Expand Down Expand Up @@ -258,21 +305,107 @@ def handler(request, context):
# Interleave calls so spans are active on each thread at the same
# time
with futures.ThreadPoolExecutor(max_workers=2) as tpe:
f1 = tpe.submit(channel.unary_unary(""), b"")
f2 = tpe.submit(channel.unary_unary(""), b"")
f1 = tpe.submit(
channel.unary_unary("TestServicer/handler"), b""
)
f2 = tpe.submit(
channel.unary_unary("TestServicer/handler"), b""
)
futures.wait((f1, f2))
finally:
server.stop(None)

self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
# Spans should belong to separate traces
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)

for span in (span1, span2):
# each should be a root span
self.assertIsNone(span2.parent)

# check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_abort(self):
"""Check that we can catch an abort properly"""

# Intercept gRPC calls...
interceptor = server_interceptor()

# our detailed failure message
failure_message = "This is a test failure"

# aborting RPC handler
def handler(request, context):
context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message)

server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"

server.start()
# unfortunately, these are just bare exceptions in grpc...
with self.assertRaises(Exception):
channel.unary_unary(rpc_call)(b"")
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# make sure this span errored, with the right status and detail
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
"{}:{}".format(
grpc.StatusCode.FAILED_PRECONDITION, failure_message
),
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.FAILED_PRECONDITION.value[
0
],
},
)


def get_latch(num):
Expand Down

0 comments on commit a7aa662

Please sign in to comment.