Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC instrumentation: client additions #269

Merged
merged 13 commits into from
Feb 5, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- Update TraceState to adhere to specs
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))

### Removed
- Remove Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,33 @@ class GrpcInstrumentorClient(BaseInstrumentor):

"""

# Figures out which channel type we need to wrap
def _which_channel(self, kwargs):
# handle legacy argument
if "channel_type" in kwargs:
if kwargs.get("channel_type") == "secure":
return ("secure_channel",)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you want to return different types (tuple or list)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I didn't even realize. I'll fix it, good catch.

return ("insecure_channel",)

# handle modern arguments
types = []
for ctype in ("secure_channel", "insecure_channel"):
if kwargs.get(ctype, True):
types.append(ctype)

return tuple(types)

def _instrument(self, **kwargs):
exporter = kwargs.get("exporter", None)
interval = kwargs.get("interval", 30)
if kwargs.get("channel_type") == "secure":
for ctype in self._which_channel(kwargs):
_wrap(
"grpc",
"secure_channel",
partial(self.wrapper_fn, exporter, interval),
)

else:
_wrap(
"grpc",
"insecure_channel",
partial(self.wrapper_fn, exporter, interval),
"grpc", ctype, partial(self.wrapper_fn, exporter, interval),
)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
unwrap(grpc, "secure_channel")

else:
unwrap(grpc, "insecure_channel")
for ctype in self._which_channel(kwargs):
unwrap(grpc, ctype)

def wrapper_fn(
self, exporter, interval, original_func, instance, args, kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,16 @@ def __init__(self, tracer, exporter, interval):
)

def _start_span(self, method):
service, meth = method.lstrip("/").split("/", 1)
attributes = {
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
"rpc.method": meth,
"rpc.service": service,
}

return self._tracer.start_as_current_span(
name=method, kind=trace.SpanKind.CLIENT
name=method, kind=trace.SpanKind.CLIENT, attributes=attributes
)

# pylint:disable=no-self-use
Expand Down Expand Up @@ -133,6 +141,7 @@ def _trace_result(self, guarded_span, rpc_info, result, client_info):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)

return result

def _start_guarded_span(self, *args, **kwargs):
Expand Down Expand Up @@ -175,11 +184,14 @@ def intercept_unary(self, request, metadata, client_info, invoker):

try:
result = invoker(request, metadata)
except grpc.RpcError:
except grpc.RpcError as err:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
guarded_span.generated_span.set_attribute(
"rpc.grpc.status_code", err.code().value[0]
)
raise err

return self._trace_result(
guarded_span, rpc_info, result, client_info
Expand Down Expand Up @@ -230,9 +242,12 @@ def _intercept_server_stream(
response.ByteSize(), client_info.full_method
)
yield response
except grpc.RpcError:
except grpc.RpcError as err:
span.set_status(Status(StatusCode.ERROR))
raise
span.set_attribute(
"rpc.grpc.status_code", err.code().value[0]
)
raise err

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand Down Expand Up @@ -268,11 +283,14 @@ def intercept_stream(

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError:
except grpc.RpcError as err:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
guarded_span.generated_span.set_attribute(
"rpc.grpc.status_code", err.code().value[0],
)
raise err

return self._trace_result(
guarded_span, rpc_info, result, client_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,32 @@ def __init__(self, meter, span_kind):

def record_bytes_in(self, bytes_in, method):
if self._meter:
labels = {"method": method}
labels = {"rpc.method": method}
self._bytes_in.add(bytes_in, labels)

def record_bytes_out(self, bytes_out, method):
if self._meter:
labels = {"method": method}
labels = {"rpc.method": method}
self._bytes_out.add(bytes_out, labels)

@contextmanager
def record_latency(self, method):
start_time = time()
labels = {
"method": method,
"status_code": grpc.StatusCode.OK, # pylint:disable=no-member
"rpc.method": method,
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.name,
}
try:
yield labels
except grpc.RpcError as exc: # pylint:disable=no-member
if self._meter:
# pylint: disable=no-member
labels["status_code"] = exc.code()
labels["rpc.grpc.status_code"] = exc.code().name
self._error_count.add(1, labels)
labels["error"] = True
labels["error"] = "true"
raise
finally:
if self._meter:
if "error" not in labels:
labels["error"] = False
elapsed_time = (time() - start_time) * 1000
self._duration.record(elapsed_time, labels)
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,21 @@ def _verify_success_records(self, num_bytes_out, num_bytes_in, method):

self.assertIsNotNone(bytes_out)
self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out")
self.assertEqual(bytes_out.labels, (("method", method),))
self.assertEqual(bytes_out.labels, (("rpc.method", method),))

self.assertIsNotNone(bytes_in)
self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in")
self.assertEqual(bytes_in.labels, (("method", method),))
self.assertEqual(bytes_in.labels, (("rpc.method", method),))

self.assertIsNotNone(duration)
self.assertEqual(duration.instrument.name, "grpcio/client/duration")
self.assertEqual(
duration.labels,
(
("error", False),
("method", method),
("status_code", grpc.StatusCode.OK),
),
self.assertSequenceEqual(
sorted(duration.labels),
[
("rpc.grpc.status_code", grpc.StatusCode.OK.name),
("rpc.method", method),
("rpc.system", "grpc"),
],
)

self.assertEqual(type(bytes_out.aggregator), SumAggregator)
Expand Down Expand Up @@ -116,6 +116,16 @@ def test_unary_unary(self):

self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod")

self.assert_span_has_attributes(
span,
{
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -134,6 +144,16 @@ def test_unary_stream(self):
8, 40, "/GRPCTestServer/ServerStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -152,6 +172,16 @@ def test_stream_unary(self):
40, 8, "/GRPCTestServer/ClientStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "ClientStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -172,6 +202,16 @@ def test_stream_stream(self):
40, 40, "/GRPCTestServer/BidirectionalStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "BidirectionalStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def _verify_error_records(self, method):
# pylint: disable=protected-access,no-member
self.channel._interceptor.controller.tick()
Expand All @@ -195,21 +235,33 @@ def _verify_error_records(self, method):
self.assertIsNotNone(duration)

self.assertEqual(errors.instrument.name, "grpcio/client/errors")
self.assertEqual(
errors.labels,
(
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
self.assertSequenceEqual(
sorted(errors.labels),
sorted(
(
(
"rpc.grpc.status_code",
grpc.StatusCode.INVALID_ARGUMENT.name,
),
("rpc.method", method),
("rpc.system", "grpc"),
)
),
)
self.assertEqual(errors.aggregator.checkpoint, 1)

self.assertEqual(
duration.labels,
(
("error", True),
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
self.assertSequenceEqual(
sorted(duration.labels),
sorted(
(
("error", "true"),
("rpc.method", method),
("rpc.system", "grpc"),
(
"rpc.grpc.status_code",
grpc.StatusCode.INVALID_ARGUMENT.name,
),
)
),
)

Expand Down