Skip to content

Commit

Permalink
gRPC streaming bugfix (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
alertedsnake authored and lzchen committed Feb 23, 2021
1 parent a695af9 commit a5820c5
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 10 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#308](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/308))
- `opentelemetry-instrumentation-boto` updated to set span attributes instead of overriding the resource.
([#310](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/310))
- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses
([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260))
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))

## [0.17b0](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.17b0) - 2021-01-20

Expand Down Expand Up @@ -78,8 +82,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- Update TraceState to adhere to specs
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))

### Removed
- Remove Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):

# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context,
)

with self._set_remote_context(context):
with self._start_span(
handler_call_details, context
Expand All @@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context):
# And now we run the actual RPC.
try:
return behavior(request_or_iterator, context)

except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
Expand All @@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context):
return _wrap_rpc_behavior(
continuation(handler_call_details), telemetry_wrapper
)

# Handle streaming responses separately - we have to do this
# to return a *new* generator or various upstream things
# get confused, or we'll lose the consistent trace
def _intercept_server_stream(
self, behavior, handler_call_details, request_or_iterator, context
):

with self._set_remote_context(context):
with self._start_span(handler_call_details, context) as span:
context = _OpenTelemetryServicerContext(context, span)

try:
yield from behavior(request_or_iterator, context)

except Exception as error:
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode

from .protobuf.test_server_pb2 import Request, Response
from .protobuf.test_server_pb2_grpc import (
GRPCTestServerServicer,
add_GRPCTestServerServicer_to_server,
)


class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
def __init__(self, handler):
Expand All @@ -51,6 +57,23 @@ def service(self, handler_call_details):
return UnaryUnaryMethodHandler(self._unary_unary_handler)


class Servicer(GRPCTestServerServicer):
"""Our test servicer"""

# pylint:disable=C0103
def SimpleMethod(self, request, context):
return Response(
server_id=request.client_id, response_data=request.request_data,
)

# pylint:disable=C0103
def ServerStreamingMethod(self, request, context):
for data in ("one", "two", "three"):
yield Response(
server_id=request.client_id, response_data=data,
)


class TestOpenTelemetryServerInterceptor(TestBase):
def test_instrumentor(self):
def handler(request, context):
Expand Down Expand Up @@ -134,25 +157,146 @@ def test_create_span(self):
# Intercept gRPC calls...
interceptor = server_interceptor()

# No-op RPC handler
def handler(request, context):
return b""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(msg)
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_create_two_spans(self):
"""Verify that the interceptor captures sub spans within the given
trace"""

class TwoSpanServicer(GRPCTestServerServicer):
# pylint:disable=C0103
def SimpleMethod(self, request, context):

# create another span
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("child") as child:
child.add_event("child event")

return Response(
server_id=request.client_id,
response_data=request.request_data,
)

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
# setup the RPC
rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(msg)
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 2)
child_span = spans_list[0]
parent_span = spans_list[1]

self.assertEqual(parent_span.name, rpc_call)
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
parent_span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
parent_span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

# Check the child span
self.assertEqual(child_span.name, "child")
self.assertEqual(
parent_span.context.trace_id, child_span.context.trace_id
)

def test_create_span_streaming(self):
"""Check that the interceptor wraps calls with spans server-side, on a
streaming call."""

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(b"")
list(channel.unary_stream(rpc_call)(msg))
finally:
server.stop(None)

Expand All @@ -174,13 +318,86 @@ def handler(request, context):
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_create_two_spans_streaming(self):
"""Verify that the interceptor captures sub spans in a
streaming call, within the given trace"""

class TwoSpanServicer(GRPCTestServerServicer):
# pylint:disable=C0103
def ServerStreamingMethod(self, request, context):

# create another span
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("child") as child:
child.add_event("child event")

for data in ("one", "two", "three"):
yield Response(
server_id=request.client_id, response_data=data,
)

# Intercept gRPC calls...
interceptor = server_interceptor()

# setup the server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
list(channel.unary_stream(rpc_call)(msg))
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 2)
child_span = spans_list[0]
parent_span = spans_list[1]

self.assertEqual(parent_span.name, rpc_call)
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
parent_span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
parent_span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

# Check the child span
self.assertEqual(child_span.name, "child")
self.assertEqual(
parent_span.context.trace_id, child_span.context.trace_id
)

def test_span_lifetime(self):
"""Check that the span is active for the duration of the call."""

Expand Down

0 comments on commit a5820c5

Please sign in to comment.