Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC streaming bugfix #260

Merged
merged 9 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- Update TraceState to adhere to specs
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses
([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260))

### Removed
- Remove Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):

# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context,
)

with self._set_remote_context(context):
with self._start_span(
handler_call_details, context
Expand All @@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context):
# And now we run the actual RPC.
try:
return behavior(request_or_iterator, context)

except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
Expand All @@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context):
return _wrap_rpc_behavior(
continuation(handler_call_details), telemetry_wrapper
)

# Handle streaming responses separately - we have to do this
# to return a *new* generator or various upstream things
# get confused, or we'll lose the consistent trace
def _intercept_server_stream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single-use function here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the issue is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have been Nit: single-use function here.

Having a function that is used only once has the disadvantage of making the reader make a mental copy of the arguments passed to it, having to look for the function code, mentally pasting the arguments, reading the function code and then going back to the starting point without any of the advantages of not having repeated code that a function provides.

Anyways, it is still a nitpick, you can leave it as is if you please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this must be a function, for the reason explained in a the comment.

self, behavior, handler_call_details, request_or_iterator, context
):

with self._set_remote_context(context):
with self._start_span(handler_call_details, context) as span:
context = _OpenTelemetryServicerContext(context, span)

try:
yield from behavior(request_or_iterator, context)

except Exception as error:
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why is this necessary? record_exception expects an Exception. Even so, if this check was appropriate, wouldn't it be better to use isinstance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - in the gRPC world bare Exception instances are thrown by the system when you call grpc.abort(), and some of those shouldn't be treated as actual errors, for example the HTTP 404 series responses which are returned to the client but not recorded as errors in a trace.

This allows us to record anything except those as uncaught errors, since it's really unlikely anything else would throw a bare exception itself. It's not ideal but there just isn't any other way to distinguish these at this point in the program flow.

Using isinstance() would catch application exceptions here, which we want to raise normally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. This is unusual code, but necessary because of a particularity of gRPC that the reader may not know about. I'll approve, but please add this explanation as a comment here. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same exact code is on line 263, with a comment describing why it was done this way. Happy to duplicate the comment here if that would be preferred.

span.record_exception(error)
raise error
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode

from .protobuf.test_server_pb2 import Request, Response
from .protobuf.test_server_pb2_grpc import (
GRPCTestServerServicer,
add_GRPCTestServerServicer_to_server,
)


class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
def __init__(self, handler):
Expand All @@ -51,6 +57,23 @@ def service(self, handler_call_details):
return UnaryUnaryMethodHandler(self._unary_unary_handler)


class Servicer(GRPCTestServerServicer):
"""Our test servicer"""

# pylint:disable=C0103
def SimpleMethod(self, request, context):
return Response(
server_id=request.client_id, response_data=request.request_data,
)

# pylint:disable=C0103
def ServerStreamingMethod(self, request, context):
for data in ("one", "two", "three"):
yield Response(
server_id=request.client_id, response_data=data,
)


class TestOpenTelemetryServerInterceptor(TestBase):
def test_instrumentor(self):
def handler(request, context):
Expand Down Expand Up @@ -134,25 +157,146 @@ def test_create_span(self):
# Intercept gRPC calls...
interceptor = server_interceptor()

# No-op RPC handler
def handler(request, context):
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(msg)
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_create_two_spans(self):
"""Verify that the interceptor captures sub spans within the given
trace"""

class TwoSpanServicer(GRPCTestServerServicer):
# pylint:disable=C0103
def SimpleMethod(self, request, context):

# create another span
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("child") as child:
child.add_event("child event")

return Response(
server_id=request.client_id,
response_data=request.request_data,
)

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
# setup the RPC
rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(msg)
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 2)
child_span = spans_list[0]
parent_span = spans_list[1]

self.assertEqual(parent_span.name, rpc_call)
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: redundant comment

self.check_span_instrumentation_info(
parent_span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
parent_span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

# Check the child span
self.assertEqual(child_span.name, "child")
self.assertEqual(
parent_span.context.trace_id, child_span.context.trace_id
)

def test_create_span_streaming(self):
"""Check that the interceptor wraps calls with spans server-side, on a
streaming call."""

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(b"")
list(channel.unary_stream(rpc_call)(msg))
finally:
server.stop(None)

Expand All @@ -174,13 +318,86 @@ def handler(request, context):
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_create_two_spans_streaming(self):
"""Verify that the interceptor captures sub spans in a
streaming call, within the given trace"""

class TwoSpanServicer(GRPCTestServerServicer):
# pylint:disable=C0103
def ServerStreamingMethod(self, request, context):

# create another span
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("child") as child:
child.add_event("child event")

for data in ("one", "two", "three"):
yield Response(
server_id=request.client_id, response_data=data,
)

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
list(channel.unary_stream(rpc_call)(msg))
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 2)
child_span = spans_list[0]
parent_span = spans_list[1]

self.assertEqual(parent_span.name, rpc_call)
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
parent_span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
parent_span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

# Check the child span
self.assertEqual(child_span.name, "child")
self.assertEqual(
parent_span.context.trace_id, child_span.context.trace_id
)

def test_span_lifetime(self):
"""Check that the span is active for the duration of the call."""

Expand Down