From 0866ddd88e85a98bd6b3c64b45c2c12249ec971a Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Wed, 27 Feb 2019 14:36:22 +0100 Subject: [PATCH 01/15] Initial RPC test app and pkg dependencies --- setup.py | 1 + tests/__init__.py | 14 +++ tests/apps/grpc_server/README.md | 8 ++ tests/apps/grpc_server/__init__.py | 1 + tests/apps/grpc_server/stan.proto | 16 +++ tests/apps/grpc_server/stan_pb2.py | 139 ++++++++++++++++++++++++ tests/apps/grpc_server/stan_pb2_grpc.py | 46 ++++++++ tests/apps/grpc_server/stan_server.py | 64 +++++++++++ 8 files changed, 289 insertions(+) create mode 100644 tests/apps/grpc_server/README.md create mode 100644 tests/apps/grpc_server/__init__.py create mode 100644 tests/apps/grpc_server/stan.proto create mode 100644 tests/apps/grpc_server/stan_pb2.py create mode 100644 tests/apps/grpc_server/stan_pb2_grpc.py create mode 100644 tests/apps/grpc_server/stan_server.py diff --git a/setup.py b/setup.py index a81ac9d1..a7ed1517 100644 --- a/setup.py +++ b/setup.py @@ -72,6 +72,7 @@ def check_setuptools(): 'django>=1.11,<2.2', 'nose>=1.0', 'flask>=0.12.2', + 'grpcio>=1.18.0', 'lxml>=3.4', 'mock>=2.0.0', 'mysqlclient>=1.3.14;python_version>="3.5"', diff --git a/tests/__init__.py b/tests/__init__.py index d983039e..8e92c707 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,6 +20,20 @@ flask.start() +# Background RPC application +# +# Spawn the background RPC app that the tests will throw +# requests at. +import tests.apps.grpc_server +from .apps.grpc_server.stan_server import StanServicer +stan_servicer = StanServicer() +rpc_server_thread = threading.Thread(target=stan_servicer.start_server) +rpc_server_thread.daemon = True +rpc_server_thread.name = "Background RPC app" +print("Starting background RPC app...") +rpc_server_thread.start() + + if sys.version_info < (3, 7, 0): # Background Soap Server from .apps.soapserver4132 import soapserver diff --git a/tests/apps/grpc_server/README.md b/tests/apps/grpc_server/README.md new file mode 100644 index 00000000..c2fdddff --- /dev/null +++ b/tests/apps/grpc_server/README.md @@ -0,0 +1,8 @@ +To regenerate from the proto file: + +```bash +pip install grpcio grpcio-tools +python -m grpc_tools.protoc --proto_path=. ./stanaas.proto --python_out=. --grpc_python_out=. +``` + +Inspired from: https://technokeeda.com/programming/grpc-python-tutorial/ \ No newline at end of file diff --git a/tests/apps/grpc_server/__init__.py b/tests/apps/grpc_server/__init__.py new file mode 100644 index 00000000..779dcbfa --- /dev/null +++ b/tests/apps/grpc_server/__init__.py @@ -0,0 +1 @@ +# __all__ = ["digestor_pb2", "digestor_pb2_grpc"] \ No newline at end of file diff --git a/tests/apps/grpc_server/stan.proto b/tests/apps/grpc_server/stan.proto new file mode 100644 index 00000000..162cf9c7 --- /dev/null +++ b/tests/apps/grpc_server/stan.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package stan; + +service Stan{ + rpc SayHello(IncomingMessage) returns (OutgoingMessage) {} +} + +message IncomingMessage{ + string ToDigest = 1; +} + +message OutgoingMessage{ + string Response = 1; + bool WasHandled = 2; +} diff --git a/tests/apps/grpc_server/stan_pb2.py b/tests/apps/grpc_server/stan_pb2.py new file mode 100644 index 00000000..83377661 --- /dev/null +++ b/tests/apps/grpc_server/stan_pb2.py @@ -0,0 +1,139 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: stan.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='stan.proto', + package='stan', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fIncomingMessage\x12\x10\n\x08ToDigest\x18\x01 \x01(\t\"7\n\x0fOutgoingMessage\x12\x10\n\x08Response\x18\x01 \x01(\t\x12\x12\n\nWasHandled\x18\x02 \x01(\x08\x32\x42\n\x04Stan\x12:\n\x08SayHello\x12\x15.stan.IncomingMessage\x1a\x15.stan.OutgoingMessage\"\x00\x62\x06proto3') +) + + + + +_INCOMINGMESSAGE = _descriptor.Descriptor( + name='IncomingMessage', + full_name='stan.IncomingMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='ToDigest', full_name='stan.IncomingMessage.ToDigest', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=20, + serialized_end=55, +) + + +_OUTGOINGMESSAGE = _descriptor.Descriptor( + name='OutgoingMessage', + full_name='stan.OutgoingMessage', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='Response', full_name='stan.OutgoingMessage.Response', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='WasHandled', full_name='stan.OutgoingMessage.WasHandled', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=57, + serialized_end=112, +) + +DESCRIPTOR.message_types_by_name['IncomingMessage'] = _INCOMINGMESSAGE +DESCRIPTOR.message_types_by_name['OutgoingMessage'] = _OUTGOINGMESSAGE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +IncomingMessage = _reflection.GeneratedProtocolMessageType('IncomingMessage', (_message.Message,), dict( + DESCRIPTOR = _INCOMINGMESSAGE, + __module__ = 'stan_pb2' + # @@protoc_insertion_point(class_scope:stan.IncomingMessage) + )) +_sym_db.RegisterMessage(IncomingMessage) + +OutgoingMessage = _reflection.GeneratedProtocolMessageType('OutgoingMessage', (_message.Message,), dict( + DESCRIPTOR = _OUTGOINGMESSAGE, + __module__ = 'stan_pb2' + # @@protoc_insertion_point(class_scope:stan.OutgoingMessage) + )) +_sym_db.RegisterMessage(OutgoingMessage) + + + +_STAN = _descriptor.ServiceDescriptor( + name='Stan', + full_name='stan.Stan', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=114, + serialized_end=180, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='stan.Stan.SayHello', + index=0, + containing_service=None, + input_type=_INCOMINGMESSAGE, + output_type=_OUTGOINGMESSAGE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_STAN) + +DESCRIPTOR.services_by_name['Stan'] = _STAN + +# @@protoc_insertion_point(module_scope) diff --git a/tests/apps/grpc_server/stan_pb2_grpc.py b/tests/apps/grpc_server/stan_pb2_grpc.py new file mode 100644 index 00000000..696942b6 --- /dev/null +++ b/tests/apps/grpc_server/stan_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import tests.apps.grpc_server.stan_pb2 as stan__pb2 + + +class StanStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/stan.Stan/SayHello', + request_serializer=stan__pb2.IncomingMessage.SerializeToString, + response_deserializer=stan__pb2.OutgoingMessage.FromString, + ) + + +class StanServicer(object): + # missing associated documentation comment in .proto file + pass + + def SayHello(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_StanServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=stan__pb2.IncomingMessage.FromString, + response_serializer=stan__pb2.OutgoingMessage.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'stan.Stan', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py new file mode 100644 index 00000000..ee1c23e9 --- /dev/null +++ b/tests/apps/grpc_server/stan_server.py @@ -0,0 +1,64 @@ +import grpc +import time +import hashlib +import tests.apps.grpc_server.stan_pb2 as stan_pb2 +import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc +from concurrent import futures + +class StanServicer(stan_pb2_grpc.StanServicer): + """ + gRPC server for Stan Service + """ + def __init__(self, *args, **kwargs): + self.server_port = 5003 + + def SayHello(self, request, context): + """ + Implementation of the rpc SayHello declared in the proto + file above. + """ + # get the string from the incoming request + to_be_digested = request.SayHello + + # digest and get the string representation + # from the digestor + hasher = hashlib.sha256() + hasher.update(to_be_digested.encode()) + response = hasher.hexdigest() + + # print the output here + print(response) + + result = {'Digested': response, 'WasDigested': True} + + return stan_pb2.OutgoingMessage(**result) + + def start_server(self): + """ + Function which actually starts the gRPC server, and preps + it for serving incoming connections + """ + # declare a server object with desired number + # of thread pool workers. + digestor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + # This line can be ignored + stan_pb2_grpc.add_StanServicer_to_server(StanServicer(),digestor_server) + + # bind the server to the port defined above + digestor_server.add_insecure_port('[::]:{}'.format(self.server_port)) + + # start the server + digestor_server.start() + print ('Stan as a Service RPC Server running ...') + + try: + # need an infinite loop since the above + # code is non blocking, and if I don't do this + # the program will exit + while True: + time.sleep(60*60*60) + except KeyboardInterrupt: + digestor_server.stop(0) + print('Stan as a Service RPC Server Stopped ...') + From ac6d65fd1ef21e17c9ce235eae57767f0b5cc15d Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 18:09:55 +0200 Subject: [PATCH 02/15] Propagator Improvements: - New binary propagator for upcoming grpc instrumentation - Fixed handling/processing of level header --- instana/binary_propagator.py | 83 ++++++++++++++++++++++++++++++++++++ instana/http_propagator.py | 16 ++++--- instana/span_context.py | 22 ++++++++++ instana/text_propagator.py | 20 ++++++--- 4 files changed, 130 insertions(+), 11 deletions(-) create mode 100644 instana/binary_propagator.py create mode 100644 instana/span_context.py diff --git a/instana/binary_propagator.py b/instana/binary_propagator.py new file mode 100644 index 00000000..cfad7050 --- /dev/null +++ b/instana/binary_propagator.py @@ -0,0 +1,83 @@ +from __future__ import absolute_import + +import opentracing as ot + +from .log import logger +from .util import header_to_id +from .span_context import InstanaSpanContext + + +class BinaryPropagator(): + """ + A Propagator for TEXT_MAP. + """ + HEADER_KEY_T = b'x-instana-t' + HEADER_KEY_S = b'x-instana-s' + HEADER_KEY_L = b'x-instana-l' + + def inject(self, span_context, carrier): + try: + trace_id = str.encode(span_context.trace_id) + span_id = str.encode(span_context.span_id) + level = str.encode("1") + + if type(carrier) is dict or hasattr(carrier, "__dict__"): + carrier[self.HEADER_KEY_T] = trace_id + carrier[self.HEADER_KEY_S] = span_id + carrier[self.HEADER_KEY_L] = level + elif type(carrier) is list: + carrier.append((self.HEADER_KEY_T, trace_id)) + carrier.append((self.HEADER_KEY_S, span_id)) + carrier.append((self.HEADER_KEY_L, level)) + elif type(carrier) is tuple: + carrier = carrier.__add__(((self.HEADER_KEY_T, trace_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_S, span_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_L, level),)) + elif hasattr(carrier, '__setitem__'): + carrier.__setitem__(self.HEADER_KEY_T, trace_id) + carrier.__setitem__(self.HEADER_KEY_S, span_id) + carrier.__setitem__(self.HEADER_KEY_L, level) + else: + raise Exception("Unsupported carrier type", type(carrier)) + + return carrier + except Exception: + logger.debug("inject error:", exc_info=True) + + def extract(self, carrier): # noqa + trace_id = None + span_id = None + level = None + + try: + if type(carrier) is dict or hasattr(carrier, "__getitem__"): + dc = carrier + elif hasattr(carrier, "__dict__"): + dc = carrier.__dict__ + elif type(carrier) is list: + dc = dict(carrier) + else: + raise ot.SpanContextCorruptedException() + + for key, value in dc.items(): + if type(key) is str: + key = str.encode(key) + + if self.HEADER_KEY_T == key: + trace_id = header_to_id(value) + elif self.HEADER_KEY_S == key: + span_id = header_to_id(value) + elif self.HEADER_KEY_L == key: + level = value + + ctx = None + if trace_id is not None and span_id is not None: + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) + return ctx + + except Exception: + logger.debug("extract error:", exc_info=True) diff --git a/instana/http_propagator.py b/instana/http_propagator.py index 2413b14b..b4bde7d0 100644 --- a/instana/http_propagator.py +++ b/instana/http_propagator.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import opentracing as ot -from basictracer.context import SpanContext +from .span_context import InstanaSpanContext from .log import logger from .util import header_to_id @@ -62,6 +62,7 @@ def inject(self, span_context, carrier): def extract(self, carrier): # noqa trace_id = None span_id = None + level = 1 try: if type(carrier) is dict or hasattr(carrier, "__getitem__"): @@ -82,18 +83,23 @@ def extract(self, carrier): # noqa trace_id = header_to_id(dc[key]) elif self.LC_HEADER_KEY_S == lc_key: span_id = header_to_id(dc[key]) + elif self.LC_HEADER_KEY_L == lc_key: + level = dc[key] elif self.ALT_LC_HEADER_KEY_T == lc_key: trace_id = header_to_id(dc[key]) elif self.ALT_LC_HEADER_KEY_S == lc_key: span_id = header_to_id(dc[key]) + elif self.ALT_LC_HEADER_KEY_L == lc_key: + level = dc[key] ctx = None if trace_id is not None and span_id is not None: - ctx = SpanContext(span_id=span_id, - trace_id=trace_id, - baggage={}, - sampled=True) + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) return ctx except Exception: diff --git a/instana/span_context.py b/instana/span_context.py new file mode 100644 index 00000000..6fd4d120 --- /dev/null +++ b/instana/span_context.py @@ -0,0 +1,22 @@ + +from basictracer.context import SpanContext + + +class InstanaSpanContext(SpanContext): + """ + SpanContext based on the Basic tracer implementation. + We subclass this so that we can also store 'level' and eventually + remove the basictracer dependency altogether. + """ + def __init__( + self, + trace_id=None, + span_id=None, + baggage=None, + sampled=True, + level=1): + self.level = level + + super(InstanaSpanContext, self).__init__(trace_id, span_id, baggage, sampled) + + diff --git a/instana/text_propagator.py b/instana/text_propagator.py index 7a5bdbd2..a71981b2 100644 --- a/instana/text_propagator.py +++ b/instana/text_propagator.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import opentracing as ot -from basictracer.context import SpanContext +from .span_context import InstanaSpanContext from .log import logger from .util import header_to_id @@ -11,7 +11,6 @@ class TextPropagator(): """ A Propagator for TEXT_MAP. """ - HEADER_KEY_T = 'X-INSTANA-T' HEADER_KEY_S = 'X-INSTANA-S' HEADER_KEY_L = 'X-INSTANA-L' @@ -29,6 +28,10 @@ def inject(self, span_context, carrier): carrier.append((self.HEADER_KEY_T, trace_id)) carrier.append((self.HEADER_KEY_S, span_id)) carrier.append((self.HEADER_KEY_L, "1")) + elif type(carrier) is tuple: + carrier = carrier.__add__(((self.HEADER_KEY_T, trace_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_S, span_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_L, "1"),)) elif hasattr(carrier, '__setitem__'): carrier.__setitem__(self.HEADER_KEY_T, trace_id) carrier.__setitem__(self.HEADER_KEY_S, span_id) @@ -36,12 +39,14 @@ def inject(self, span_context, carrier): else: raise Exception("Unsupported carrier type", type(carrier)) + return carrier except Exception: logger.debug("inject error:", exc_info=True) def extract(self, carrier): # noqa trace_id = None span_id = None + level = 1 try: if type(carrier) is dict or hasattr(carrier, "__getitem__"): @@ -58,13 +63,16 @@ def extract(self, carrier): # noqa trace_id = header_to_id(dc[key]) elif self.HEADER_KEY_S == key: span_id = header_to_id(dc[key]) + elif self.HEADER_KEY_L == key: + level = dc[key] ctx = None if trace_id is not None and span_id is not None: - ctx = SpanContext(span_id=span_id, - trace_id=trace_id, - baggage={}, - sampled=True) + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) return ctx except Exception: From c12efd687b65b1af0fbf960a53354bc8870e0fff Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 18:12:07 +0200 Subject: [PATCH 03/15] Register new Binary propagator --- instana/tracer.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/instana/tracer.py b/instana/tracer.py index e35b5974..ea0b4fe6 100644 --- a/instana/tracer.py +++ b/instana/tracer.py @@ -7,13 +7,14 @@ import opentracing as ot from basictracer import BasicTracer -from basictracer.context import SpanContext +from .binary_propagator import BinaryPropagator from .http_propagator import HTTPPropagator +from .text_propagator import TextPropagator +from .span_context import InstanaSpanContext from .options import Options from .recorder import InstanaRecorder, InstanaSampler from .span import InstanaSpan -from .text_propagator import TextPropagator from .util import generate_id @@ -28,6 +29,7 @@ def __init__(self, options=Options(), scope_manager=None, recorder=None): self._propagators[ot.Format.HTTP_HEADERS] = HTTPPropagator() self._propagators[ot.Format.TEXT_MAP] = TextPropagator() + self._propagators[ot.Format.BINARY] = BinaryPropagator() def handle_fork(self): # Nothing to do for the Tracer; Pass onto Recorder @@ -83,7 +85,7 @@ def start_span(self, # Assemble the child ctx gid = generate_id() - ctx = SpanContext(span_id=gid) + ctx = InstanaSpanContext(span_id=gid) if parent_ctx is not None: if parent_ctx._baggage is not None: ctx._baggage = parent_ctx._baggage.copy() @@ -112,7 +114,7 @@ def start_span(self, def inject(self, span_context, format, carrier): if format in self._propagators: - self._propagators[format].inject(span_context, carrier) + return self._propagators[format].inject(span_context, carrier) else: raise ot.UnsupportedFormatException() From 1dbb7d2cff5bbb77091d60a5b6977952b1412caf Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 18:12:50 +0200 Subject: [PATCH 04/15] Updated GRPC application for the test suite --- tests/apps/grpc_server/stan.proto | 13 +++--- tests/apps/grpc_server/stan_pb2.py | 54 ++++++++++++------------- tests/apps/grpc_server/stan_pb2_grpc.py | 18 ++++----- tests/apps/grpc_server/stan_server.py | 39 ++++++++---------- 4 files changed, 59 insertions(+), 65 deletions(-) diff --git a/tests/apps/grpc_server/stan.proto b/tests/apps/grpc_server/stan.proto index 162cf9c7..46e977fb 100644 --- a/tests/apps/grpc_server/stan.proto +++ b/tests/apps/grpc_server/stan.proto @@ -3,14 +3,15 @@ syntax = "proto3"; package stan; service Stan{ - rpc SayHello(IncomingMessage) returns (OutgoingMessage) {} + rpc AskQuestion(QuestionRequest) returns (QuestionResponse) {} } -message IncomingMessage{ - string ToDigest = 1; + +message QuestionRequest { + string question = 1; } -message OutgoingMessage{ - string Response = 1; - bool WasHandled = 2; +message QuestionResponse { + string answer = 1; + bool was_answered = 2; } diff --git a/tests/apps/grpc_server/stan_pb2.py b/tests/apps/grpc_server/stan_pb2.py index 83377661..25a7678c 100644 --- a/tests/apps/grpc_server/stan_pb2.py +++ b/tests/apps/grpc_server/stan_pb2.py @@ -19,21 +19,21 @@ package='stan', syntax='proto3', serialized_options=None, - serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fIncomingMessage\x12\x10\n\x08ToDigest\x18\x01 \x01(\t\"7\n\x0fOutgoingMessage\x12\x10\n\x08Response\x18\x01 \x01(\t\x12\x12\n\nWasHandled\x18\x02 \x01(\x08\x32\x42\n\x04Stan\x12:\n\x08SayHello\x12\x15.stan.IncomingMessage\x1a\x15.stan.OutgoingMessage\"\x00\x62\x06proto3') + serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\x46\n\x04Stan\x12>\n\x0b\x41skQuestion\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x62\x06proto3') ) -_INCOMINGMESSAGE = _descriptor.Descriptor( - name='IncomingMessage', - full_name='stan.IncomingMessage', +_QUESTIONREQUEST = _descriptor.Descriptor( + name='QuestionRequest', + full_name='stan.QuestionRequest', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='ToDigest', full_name='stan.IncomingMessage.ToDigest', index=0, + name='question', full_name='stan.QuestionRequest.question', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -56,22 +56,22 @@ ) -_OUTGOINGMESSAGE = _descriptor.Descriptor( - name='OutgoingMessage', - full_name='stan.OutgoingMessage', +_QUESTIONRESPONSE = _descriptor.Descriptor( + name='QuestionResponse', + full_name='stan.QuestionResponse', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='Response', full_name='stan.OutgoingMessage.Response', index=0, + name='answer', full_name='stan.QuestionResponse.answer', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='WasHandled', full_name='stan.OutgoingMessage.WasHandled', index=1, + name='was_answered', full_name='stan.QuestionResponse.was_answered', index=1, number=2, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, @@ -90,26 +90,26 @@ oneofs=[ ], serialized_start=57, - serialized_end=112, + serialized_end=113, ) -DESCRIPTOR.message_types_by_name['IncomingMessage'] = _INCOMINGMESSAGE -DESCRIPTOR.message_types_by_name['OutgoingMessage'] = _OUTGOINGMESSAGE +DESCRIPTOR.message_types_by_name['QuestionRequest'] = _QUESTIONREQUEST +DESCRIPTOR.message_types_by_name['QuestionResponse'] = _QUESTIONRESPONSE _sym_db.RegisterFileDescriptor(DESCRIPTOR) -IncomingMessage = _reflection.GeneratedProtocolMessageType('IncomingMessage', (_message.Message,), dict( - DESCRIPTOR = _INCOMINGMESSAGE, +QuestionRequest = _reflection.GeneratedProtocolMessageType('QuestionRequest', (_message.Message,), dict( + DESCRIPTOR = _QUESTIONREQUEST, __module__ = 'stan_pb2' - # @@protoc_insertion_point(class_scope:stan.IncomingMessage) + # @@protoc_insertion_point(class_scope:stan.QuestionRequest) )) -_sym_db.RegisterMessage(IncomingMessage) +_sym_db.RegisterMessage(QuestionRequest) -OutgoingMessage = _reflection.GeneratedProtocolMessageType('OutgoingMessage', (_message.Message,), dict( - DESCRIPTOR = _OUTGOINGMESSAGE, +QuestionResponse = _reflection.GeneratedProtocolMessageType('QuestionResponse', (_message.Message,), dict( + DESCRIPTOR = _QUESTIONRESPONSE, __module__ = 'stan_pb2' - # @@protoc_insertion_point(class_scope:stan.OutgoingMessage) + # @@protoc_insertion_point(class_scope:stan.QuestionResponse) )) -_sym_db.RegisterMessage(OutgoingMessage) +_sym_db.RegisterMessage(QuestionResponse) @@ -119,16 +119,16 @@ file=DESCRIPTOR, index=0, serialized_options=None, - serialized_start=114, - serialized_end=180, + serialized_start=115, + serialized_end=185, methods=[ _descriptor.MethodDescriptor( - name='SayHello', - full_name='stan.Stan.SayHello', + name='AskQuestion', + full_name='stan.Stan.AskQuestion', index=0, containing_service=None, - input_type=_INCOMINGMESSAGE, - output_type=_OUTGOINGMESSAGE, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, serialized_options=None, ), ]) diff --git a/tests/apps/grpc_server/stan_pb2_grpc.py b/tests/apps/grpc_server/stan_pb2_grpc.py index 696942b6..3bedfd51 100644 --- a/tests/apps/grpc_server/stan_pb2_grpc.py +++ b/tests/apps/grpc_server/stan_pb2_grpc.py @@ -14,10 +14,10 @@ def __init__(self, channel): Args: channel: A grpc.Channel. """ - self.SayHello = channel.unary_unary( - '/stan.Stan/SayHello', - request_serializer=stan__pb2.IncomingMessage.SerializeToString, - response_deserializer=stan__pb2.OutgoingMessage.FromString, + self.AskQuestion = channel.unary_unary( + '/stan.Stan/AskQuestion', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, ) @@ -25,7 +25,7 @@ class StanServicer(object): # missing associated documentation comment in .proto file pass - def SayHello(self, request, context): + def AskQuestion(self, request, context): # missing associated documentation comment in .proto file pass context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -35,10 +35,10 @@ def SayHello(self, request, context): def add_StanServicer_to_server(servicer, server): rpc_method_handlers = { - 'SayHello': grpc.unary_unary_rpc_method_handler( - servicer.SayHello, - request_deserializer=stan__pb2.IncomingMessage.FromString, - response_serializer=stan__pb2.OutgoingMessage.SerializeToString, + 'AskQuestion': grpc.unary_unary_rpc_method_handler( + servicer.AskQuestion, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py index ee1c23e9..a3a0a1b8 100644 --- a/tests/apps/grpc_server/stan_server.py +++ b/tests/apps/grpc_server/stan_server.py @@ -1,37 +1,31 @@ import grpc import time -import hashlib import tests.apps.grpc_server.stan_pb2 as stan_pb2 import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc from concurrent import futures + class StanServicer(stan_pb2_grpc.StanServicer): """ gRPC server for Stan Service """ def __init__(self, *args, **kwargs): - self.server_port = 5003 + self.server_port = 5030 - def SayHello(self, request, context): - """ - Implementation of the rpc SayHello declared in the proto - file above. - """ + def AskQuestion(self, request, context): # get the string from the incoming request - to_be_digested = request.SayHello + question = request.question - # digest and get the string representation - # from the digestor - hasher = hashlib.sha256() - hasher.update(to_be_digested.encode()) - response = hasher.hexdigest() + response = """\ +Invention, my dear friends, is 93% perspiration, 6% electricity, \ +4% evaporation, and 2% butterscotch ripple. – Willy Wonka""" - # print the output here - print(response) + # # print the output here + # print(response)54a - result = {'Digested': response, 'WasDigested': True} + result = {'answer': response, 'was_answered': True} - return stan_pb2.OutgoingMessage(**result) + return stan_pb2.QuestionResponse(**result) def start_server(self): """ @@ -40,17 +34,16 @@ def start_server(self): """ # declare a server object with desired number # of thread pool workers. - digestor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + rpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # This line can be ignored - stan_pb2_grpc.add_StanServicer_to_server(StanServicer(),digestor_server) + stan_pb2_grpc.add_StanServicer_to_server(StanServicer(), rpc_server) # bind the server to the port defined above - digestor_server.add_insecure_port('[::]:{}'.format(self.server_port)) + rpc_server.add_insecure_port('[::]:{}'.format(self.server_port)) # start the server - digestor_server.start() - print ('Stan as a Service RPC Server running ...') + rpc_server.start() try: # need an infinite loop since the above @@ -59,6 +52,6 @@ def start_server(self): while True: time.sleep(60*60*60) except KeyboardInterrupt: - digestor_server.stop(0) + rpc_server.stop(0) print('Stan as a Service RPC Server Stopped ...') From 4e7dc8686225785f963a025ecd82f1a080bf3855 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 18:27:51 +0200 Subject: [PATCH 05/15] Initial GRPC instrumentation and tests --- instana/__init__.py | 1 + instana/instrumentation/grpcio.py | 79 ++++++++++++++++++ tests/test_grpcio.py | 128 ++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 instana/instrumentation/grpcio.py create mode 100644 tests/test_grpcio.py diff --git a/instana/__init__.py b/instana/__init__.py index 867be0e8..ad457014 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -69,6 +69,7 @@ def boot_agent(): from .instrumentation import mysqlclient from .instrumentation import flask + from .instrumentation import grpcio from .instrumentation.tornado import client from .instrumentation.tornado import server from .instrumentation import logging diff --git a/instana/instrumentation/grpcio.py b/instana/instrumentation/grpcio.py new file mode 100644 index 00000000..a6ea391c --- /dev/null +++ b/instana/instrumentation/grpcio.py @@ -0,0 +1,79 @@ +from __future__ import absolute_import + +import wrapt +import opentracing + +from ..log import logger +from ..singletons import tracer + +try: + import grpc + + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.with_call') + def with_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None or parent_span.operation_name == "rpc-client": + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if "metadata" not in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.__call__') + def call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None or parent_span.operation_name == "rpc-client": + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + + @wrapt.patch_function_wrapper('grpc._server', '_call_behavior') + def call_behavior_with_instana(wrapped, instance, argv, kwargs): + # Prep any incoming context headers + metadata = argv[0].invocation_metadata + metadata_dict = {} + for c in metadata: + metadata_dict[c.key] = c.value + + ctx = tracer.extract(opentracing.Format.BINARY, metadata_dict) + + with tracer.start_active_span("rpc-server", child_of=ctx) as scope: + try: + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + +except ImportError: + pass diff --git a/tests/test_grpcio.py b/tests/test_grpcio.py new file mode 100644 index 00000000..5cf6985d --- /dev/null +++ b/tests/test_grpcio.py @@ -0,0 +1,128 @@ +from __future__ import absolute_import + +import time +import unittest + +import grpc + +import tests.apps.grpc_server.stan_pb2 as stan_pb2 +import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc + +from instana.singletons import tracer +from .helpers import testenv + + +class TestGRPCIO(unittest.TestCase): + def setUp(self): + """ Clear all spans before a test run """ + self.recorder = tracer.recorder + self.recorder.clear_spans() + self.channel = grpc.insecure_channel('127.0.0.1:5030') + self.server_stub = stan_pb2_grpc.StanStub(self.channel) + # The grpc client apparently needs a second to connect and initialize + time.sleep(1) + + def tearDown(self): + """ Do nothing for now """ + pass + + def test_vanilla_request(self): + response = self.server_stub.AskQuestion(stan_pb2.QuestionRequest(question="Are you there?")) + self.assertEqual(response.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + def test_vanilla_request_via_with_call(self): + response = self.server_stub.AskQuestion.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + self.assertEqual(response[0].answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + def test_request(self): + with tracer.start_active_span('test'): + response = self.server_stub.AskQuestion(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + self.assertEqual(response.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = spans[0] + client_span = spans[1] + test_span = spans[2] + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_request_via_with_call(self): + with tracer.start_active_span('test'): + response = self.server_stub.AskQuestion.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + self.assertEqual(type(response), tuple) + self.assertEqual(response[0].answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = spans[0] + client_span = spans[1] + test_span = spans[2] + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') From def316b05e08db49b8ee49a0fd8bf232b24a06cb Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 18:33:58 +0200 Subject: [PATCH 06/15] Update tests to follow InstanaSpanContext changes --- tests/test_ot_propagators.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_ot_propagators.py b/tests/test_ot_propagators.py index 7417b3d8..e6c4742f 100644 --- a/tests/test_ot_propagators.py +++ b/tests/test_ot_propagators.py @@ -6,7 +6,7 @@ import instana.http_propagator as ihp import instana.text_propagator as itp -from instana import options, util +from instana import options, span_context from instana.tracer import InstanaTracer @@ -58,7 +58,7 @@ def test_http_basic_extract(): carrier = {'X-Instana-T': '1', 'X-Instana-S': '1', 'X-Instana-L': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -70,7 +70,7 @@ def test_http_mixed_case_extract(): carrier = {'x-insTana-T': '1', 'X-inSTANa-S': '1', 'X-INstana-l': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -93,7 +93,7 @@ def test_http_128bit_headers(): 'X-Instana-S': '0000000000000000b0789916ff8f319f', 'X-Instana-L': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('b0789916ff8f319f', ctx.trace_id) assert_equals('b0789916ff8f319f', ctx.span_id) @@ -146,7 +146,7 @@ def test_text_basic_extract(): carrier = {'X-INSTANA-T': '1', 'X-INSTANA-S': '1', 'X-INSTANA-L': '1'} ctx = ot.tracer.extract(ot.Format.TEXT_MAP, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -179,6 +179,6 @@ def test_text_128bit_headers(): 'X-INSTANA-S': ' 0000000000000000b0789916ff8f319f', 'X-INSTANA-L': '1'} ctx = ot.tracer.extract(ot.Format.TEXT_MAP, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('b0789916ff8f319f', ctx.trace_id) assert_equals('b0789916ff8f319f', ctx.span_id) From cd92f4b812abbf8d04e0af7d2a32f1422d4441ff Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 15 Jul 2019 22:09:23 +0200 Subject: [PATCH 07/15] Update MariaDB to fix py2.7 build issue with Mysql-python --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 158094d0..c93f4c4d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -12,7 +12,7 @@ jobs: # CircleCI maintains a library of pre-built images # documented at https://circleci.com/docs/2.0/circleci-images/ - image: circleci/postgres:9.6.5-alpine-ram - - image: circleci/mariadb:10-ram + - image: circleci/mariadb:10.1-ram - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 From dd82208807e7e06ce45de708ab03b446364a1916 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 13:24:58 +0200 Subject: [PATCH 08/15] Updated GRPC app with various streaming rpc call support --- tests/apps/grpc_server/README.md | 4 +- tests/apps/grpc_server/stan.proto | 5 +- tests/apps/grpc_server/stan_pb2.py | 37 +++++++++++++-- tests/apps/grpc_server/stan_pb2_grpc.py | 62 +++++++++++++++++++++++-- tests/apps/grpc_server/stan_server.py | 37 ++++++++++++--- 5 files changed, 125 insertions(+), 20 deletions(-) diff --git a/tests/apps/grpc_server/README.md b/tests/apps/grpc_server/README.md index c2fdddff..a02cdf3f 100644 --- a/tests/apps/grpc_server/README.md +++ b/tests/apps/grpc_server/README.md @@ -2,7 +2,7 @@ To regenerate from the proto file: ```bash pip install grpcio grpcio-tools -python -m grpc_tools.protoc --proto_path=. ./stanaas.proto --python_out=. --grpc_python_out=. +python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. ./stan.proto ``` -Inspired from: https://technokeeda.com/programming/grpc-python-tutorial/ \ No newline at end of file +Inspired by: https://technokeeda.com/programming/grpc-python-tutorial/ \ No newline at end of file diff --git a/tests/apps/grpc_server/stan.proto b/tests/apps/grpc_server/stan.proto index 46e977fb..f61542d9 100644 --- a/tests/apps/grpc_server/stan.proto +++ b/tests/apps/grpc_server/stan.proto @@ -3,7 +3,10 @@ syntax = "proto3"; package stan; service Stan{ - rpc AskQuestion(QuestionRequest) returns (QuestionResponse) {} + rpc OneQuestionOneResponse(QuestionRequest) returns (QuestionResponse) {} + rpc ManyQuestionsOneResponse(stream QuestionRequest) returns (QuestionResponse){} + rpc OneQuestionManyResponses(QuestionRequest) returns (stream QuestionResponse){} + rpc ManyQuestionsManyReponses(stream QuestionRequest) returns (stream QuestionResponse){} } diff --git a/tests/apps/grpc_server/stan_pb2.py b/tests/apps/grpc_server/stan_pb2.py index 25a7678c..9a9dbc76 100644 --- a/tests/apps/grpc_server/stan_pb2.py +++ b/tests/apps/grpc_server/stan_pb2.py @@ -19,7 +19,7 @@ package='stan', syntax='proto3', serialized_options=None, - serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\x46\n\x04Stan\x12>\n\x0b\x41skQuestion\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x62\x06proto3') + serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\xc1\x02\n\x04Stan\x12I\n\x16OneQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12M\n\x18ManyQuestionsOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x12M\n\x18OneQuestionManyResponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x30\x01\x12P\n\x19ManyQuestionsManyReponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x30\x01\x62\x06proto3') ) @@ -119,18 +119,45 @@ file=DESCRIPTOR, index=0, serialized_options=None, - serialized_start=115, - serialized_end=185, + serialized_start=116, + serialized_end=437, methods=[ _descriptor.MethodDescriptor( - name='AskQuestion', - full_name='stan.Stan.AskQuestion', + name='OneQuestionOneResponse', + full_name='stan.Stan.OneQuestionOneResponse', index=0, containing_service=None, input_type=_QUESTIONREQUEST, output_type=_QUESTIONRESPONSE, serialized_options=None, ), + _descriptor.MethodDescriptor( + name='ManyQuestionsOneResponse', + full_name='stan.Stan.ManyQuestionsOneResponse', + index=1, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='OneQuestionManyResponses', + full_name='stan.Stan.OneQuestionManyResponses', + index=2, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='ManyQuestionsManyReponses', + full_name='stan.Stan.ManyQuestionsManyReponses', + index=3, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), ]) _sym_db.RegisterServiceDescriptor(_STAN) diff --git a/tests/apps/grpc_server/stan_pb2_grpc.py b/tests/apps/grpc_server/stan_pb2_grpc.py index 3bedfd51..cfca182e 100644 --- a/tests/apps/grpc_server/stan_pb2_grpc.py +++ b/tests/apps/grpc_server/stan_pb2_grpc.py @@ -4,6 +4,7 @@ import tests.apps.grpc_server.stan_pb2 as stan__pb2 + class StanStub(object): # missing associated documentation comment in .proto file pass @@ -14,8 +15,23 @@ def __init__(self, channel): Args: channel: A grpc.Channel. """ - self.AskQuestion = channel.unary_unary( - '/stan.Stan/AskQuestion', + self.OneQuestionOneResponse = channel.unary_unary( + '/stan.Stan/OneQuestionOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.ManyQuestionsOneResponse = channel.stream_unary( + '/stan.Stan/ManyQuestionsOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.OneQuestionManyResponses = channel.unary_stream( + '/stan.Stan/OneQuestionManyResponses', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.ManyQuestionsManyReponses = channel.stream_stream( + '/stan.Stan/ManyQuestionsManyReponses', request_serializer=stan__pb2.QuestionRequest.SerializeToString, response_deserializer=stan__pb2.QuestionResponse.FromString, ) @@ -25,7 +41,28 @@ class StanServicer(object): # missing associated documentation comment in .proto file pass - def AskQuestion(self, request, context): + def OneQuestionOneResponse(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ManyQuestionsOneResponse(self, request_iterator, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def OneQuestionManyResponses(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ManyQuestionsManyReponses(self, request_iterator, context): # missing associated documentation comment in .proto file pass context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -35,8 +72,23 @@ def AskQuestion(self, request, context): def add_StanServicer_to_server(servicer, server): rpc_method_handlers = { - 'AskQuestion': grpc.unary_unary_rpc_method_handler( - servicer.AskQuestion, + 'OneQuestionOneResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneQuestionOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'ManyQuestionsOneResponse': grpc.stream_unary_rpc_method_handler( + servicer.ManyQuestionsOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'OneQuestionManyResponses': grpc.unary_stream_rpc_method_handler( + servicer.OneQuestionManyResponses, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'ManyQuestionsManyReponses': grpc.stream_stream_rpc_method_handler( + servicer.ManyQuestionsManyReponses, request_deserializer=stan__pb2.QuestionRequest.FromString, response_serializer=stan__pb2.QuestionResponse.SerializeToString, ), diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py index a3a0a1b8..abf8f2d6 100644 --- a/tests/apps/grpc_server/stan_server.py +++ b/tests/apps/grpc_server/stan_server.py @@ -4,29 +4,52 @@ import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc from concurrent import futures +from ...helpers import testenv + +testenv["grpc_port"] = 10814 +testenv["grpc_host"] = "127.0.0.1" +testenv["grpc_server"] = testenv["grpc_host"] + ":" + str(testenv["grpc_port"]) + class StanServicer(stan_pb2_grpc.StanServicer): """ gRPC server for Stan Service """ def __init__(self, *args, **kwargs): - self.server_port = 5030 + self.server_port = testenv['grpc_port'] - def AskQuestion(self, request, context): - # get the string from the incoming request - question = request.question + def OneQuestionOneResponse(self, request, context): + # print("😇:I was asked: %s" % request.question) response = """\ Invention, my dear friends, is 93% perspiration, 6% electricity, \ 4% evaporation, and 2% butterscotch ripple. – Willy Wonka""" - # # print the output here - # print(response)54a - result = {'answer': response, 'was_answered': True} return stan_pb2.QuestionResponse(**result) + def ManyQuestionsOneResponse(self, request_iterator, context): + for request in request_iterator: + # print("😇:I was asked: %s" % request.question) + pass + + result = {'answer': 'Ok', 'was_answered': True} + return stan_pb2.QuestionResponse(**result) + + def OneQuestionManyResponses(self, request, context): + # print("😇:I was asked: %s" % request.question) + + for count in range(6): + result = {'answer': 'Ok', 'was_answered': True} + yield stan_pb2.QuestionResponse(**result) + + def ManyQuestionsManyReponses(self, request_iterator, context): + for request in request_iterator: + # print("😇:I was asked: %s" % request.question) + result = {'answer': 'Ok', 'was_answered': True} + yield stan_pb2.QuestionResponse(**result) + def start_server(self): """ Function which actually starts the gRPC server, and preps From 10d7e2a4bba2bd3dd696f338f7528830e1450b7c Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 13:25:36 +0200 Subject: [PATCH 09/15] GRPC instrumentation: streaming call suport --- instana/instrumentation/grpcio.py | 140 +++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 4 deletions(-) diff --git a/instana/instrumentation/grpcio.py b/instana/instrumentation/grpcio.py index a6ea391c..39b1e479 100644 --- a/instana/instrumentation/grpcio.py +++ b/instana/instrumentation/grpcio.py @@ -8,14 +8,46 @@ try: import grpc + from grpc._channel import _UnaryUnaryMultiCallable, _StreamUnaryMultiCallable, \ + _UnaryStreamMultiCallable, _StreamStreamMultiCallable + + SUPPORTED_TYPES = [ _UnaryUnaryMultiCallable, + _StreamUnaryMultiCallable, + _UnaryStreamMultiCallable, + _StreamStreamMultiCallable ] + + def collect_tags(span, instance, argv, kwargs): + try: + span.set_tag('rpc.flavor', 'grpc') + + if type(instance) in SUPPORTED_TYPES: + method = instance._method.decode() + target = instance._channel.target().decode() + elif type(argv[0]) is grpc._cython.cygrpc.RequestCallEvent: + method = argv[0].call_details.method.decode() + target = argv[0].call_details.host.decode() + elif len(argv) > 2: + method = argv[2][2][1]._method.decode() + target = argv[2][2][1]._channel.target().decode() + + span.set_tag('rpc.call', method) + + parts = target.split(':') + if len(parts) == 2: + span.set_tag('rpc.host', parts[0]) + span.set_tag('rpc.port', parts[1]) + except: + logger.debug("grpc.collect_tags non-fatal error", exc_info=True) + finally: + return span @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.with_call') - def with_call_with_instana(wrapped, instance, argv, kwargs): + def unary_unary_with_call_with_instana(wrapped, instance, argv, kwargs): parent_span = tracer.active_span # If we're not tracing, just return - if parent_span is None or parent_span.operation_name == "rpc-client": + if parent_span is None: return wrapped(*argv, **kwargs) with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: @@ -24,6 +56,8 @@ def with_call_with_instana(wrapped, instance, argv, kwargs): kwargs["metadata"] = [] kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') rv = wrapped(*argv, **kwargs) except Exception as e: @@ -34,11 +68,11 @@ def with_call_with_instana(wrapped, instance, argv, kwargs): @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.__call__') - def call_with_instana(wrapped, instance, argv, kwargs): + def unary_unary_call_with_instana(wrapped, instance, argv, kwargs): parent_span = tracer.active_span # If we're not tracing, just return - if parent_span is None or parent_span.operation_name == "rpc-client": + if parent_span is None: return wrapped(*argv, **kwargs) with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: @@ -47,6 +81,8 @@ def call_with_instana(wrapped, instance, argv, kwargs): kwargs["metadata"] = [] kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') rv = wrapped(*argv, **kwargs) except Exception as e: @@ -55,6 +91,101 @@ def call_with_instana(wrapped, instance, argv, kwargs): else: return rv + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.__call__') + def stream_unary_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.with_call') + def stream_unary_with_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryStreamMultiCallable.__call__') + def unary_stream_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamStreamMultiCallable.__call__') + def stream_stream_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv @wrapt.patch_function_wrapper('grpc._server', '_call_behavior') def call_behavior_with_instana(wrapped, instance, argv, kwargs): @@ -68,6 +199,7 @@ def call_behavior_with_instana(wrapped, instance, argv, kwargs): with tracer.start_active_span("rpc-server", child_of=ctx) as scope: try: + collect_tags(scope.span, instance, argv, kwargs) rv = wrapped(*argv, **kwargs) except Exception as e: scope.span.log_exception(e) From 37b7e73ff4e8dced5cff37ae33bd115b37dbc50b Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 13:25:59 +0200 Subject: [PATCH 10/15] Streaming GRPC tests --- tests/test_grpcio.py | 304 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 290 insertions(+), 14 deletions(-) diff --git a/tests/test_grpcio.py b/tests/test_grpcio.py index 5cf6985d..64b007cc 100644 --- a/tests/test_grpcio.py +++ b/tests/test_grpcio.py @@ -2,6 +2,7 @@ import time import unittest +import random import grpc @@ -9,7 +10,7 @@ import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc from instana.singletons import tracer -from .helpers import testenv +from .helpers import testenv, get_first_span_by_name class TestGRPCIO(unittest.TestCase): @@ -17,7 +18,7 @@ def setUp(self): """ Clear all spans before a test run """ self.recorder = tracer.recorder self.recorder.clear_spans() - self.channel = grpc.insecure_channel('127.0.0.1:5030') + self.channel = grpc.insecure_channel(testenv["grpc_server"]) self.server_stub = stan_pb2_grpc.StanStub(self.channel) # The grpc client apparently needs a second to connect and initialize time.sleep(1) @@ -26,17 +27,31 @@ def tearDown(self): """ Do nothing for now """ pass + def generate_questions(self): + """ Used in the streaming grpc tests """ + questions = [ + stan_pb2.QuestionRequest(question="Are you there?"), + stan_pb2.QuestionRequest(question="What time is it?"), + stan_pb2.QuestionRequest(question="Where in the world is Waldo?"), + stan_pb2.QuestionRequest(question="What did one campfire say to the other?"), + stan_pb2.QuestionRequest(question="Is cereal soup?"), + stan_pb2.QuestionRequest(question="What is always coming, but never arrives?") + ] + for q in questions: + yield q + time.sleep(random.uniform(0.3, 0.7)) + def test_vanilla_request(self): - response = self.server_stub.AskQuestion(stan_pb2.QuestionRequest(question="Are you there?")) + response = self.server_stub.OneQuestionOneResponse(stan_pb2.QuestionRequest(question="Are you there?")) self.assertEqual(response.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") def test_vanilla_request_via_with_call(self): - response = self.server_stub.AskQuestion.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + response = self.server_stub.OneQuestionOneResponse.with_call(stan_pb2.QuestionRequest(question="Are you there?")) self.assertEqual(response[0].answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") - def test_request(self): + def test_unary_one_to_one(self): with tracer.start_active_span('test'): - response = self.server_stub.AskQuestion(stan_pb2.QuestionRequest(question="Are you there?")) + response = self.server_stub.OneQuestionOneResponse(stan_pb2.QuestionRequest(question="Are you there?")) self.assertIsNone(tracer.active_span) self.assertIsNotNone(response) @@ -45,9 +60,191 @@ def test_request(self): spans = self.recorder.queued_spans() self.assertEqual(3, len(spans)) - server_span = spans[0] - client_span = spans[1] - test_span = spans[2] + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_one(self): + + with tracer.start_active_span('test'): + response = self.server_stub.ManyQuestionsOneResponse(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + + self.assertEqual('Ok', response.answer) + self.assertEqual(True, response.was_answered) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_one_to_many(self): + + with tracer.start_active_span('test'): + responses = self.server_stub.OneQuestionManyResponses(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(responses) + + final_answers = [] + for response in responses: + final_answers.append(response) + + self.assertEqual(len(final_answers), 6) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionManyResponses') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionManyResponses') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_many(self): + with tracer.start_active_span('test'): + responses = self.server_stub.ManyQuestionsManyReponses(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(responses) + + final_answers = [] + for response in responses: + final_answers.append(response) + + self.assertEqual(len(final_answers), 6) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') # Same traceId self.assertEqual(server_span.t, client_span.t) @@ -70,19 +267,29 @@ def test_request(self): self.assertEqual(server_span.k, 1) self.assertIsNotNone(server_span.stack) self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsManyReponses') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) # rpc-client self.assertEqual(client_span.n, 'rpc-client') self.assertEqual(client_span.k, 2) self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsManyReponses') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) # test-span self.assertEqual(test_span.n, 'sdk') self.assertEqual(test_span.data.sdk.name, 'test') - def test_request_via_with_call(self): + def test_unary_one_to_one_with_call(self): with tracer.start_active_span('test'): - response = self.server_stub.AskQuestion.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + response = self.server_stub.OneQuestionOneResponse.with_call(stan_pb2.QuestionRequest(question="Are you there?")) self.assertIsNone(tracer.active_span) self.assertIsNotNone(response) @@ -92,9 +299,9 @@ def test_request_via_with_call(self): spans = self.recorder.queued_spans() self.assertEqual(3, len(spans)) - server_span = spans[0] - client_span = spans[1] - test_span = spans[2] + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') # Same traceId self.assertEqual(server_span.t, client_span.t) @@ -117,12 +324,81 @@ def test_request_via_with_call(self): self.assertEqual(server_span.k, 1) self.assertIsNotNone(server_span.stack) self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) # rpc-client self.assertEqual(client_span.n, 'rpc-client') self.assertEqual(client_span.k, 2) self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) # test-span self.assertEqual(test_span.n, 'sdk') self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_one_with_call(self): + with tracer.start_active_span('test'): + response = self.server_stub.ManyQuestionsOneResponse.with_call(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + + self.assertEqual('Ok', response[0].answer) + self.assertEqual(True, response[0].was_answered) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + From b2bdc444d69daa267ca7f581c18735178b985c10 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 16:16:08 +0200 Subject: [PATCH 11/15] GRPC Async/Futures support with tests --- instana/instrumentation/grpcio.py | 47 ++++++++ tests/__init__.py | 1 - tests/apps/grpc_server/stan_server.py | 3 - tests/test_grpcio.py | 167 +++++++++++++++++++++++++- 4 files changed, 213 insertions(+), 5 deletions(-) diff --git a/instana/instrumentation/grpcio.py b/instana/instrumentation/grpcio.py index 39b1e479..d5f898dd 100644 --- a/instana/instrumentation/grpcio.py +++ b/instana/instrumentation/grpcio.py @@ -66,6 +66,29 @@ def unary_unary_with_call_with_instana(wrapped, instance, argv, kwargs): else: return rv + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.future') + def unary_unary_future_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if "metadata" not in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.__call__') def unary_unary_call_with_instana(wrapped, instance, argv, kwargs): @@ -139,6 +162,30 @@ def stream_unary_with_call_with_instana(wrapped, instance, argv, kwargs): else: return rv + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.future') + def stream_unary_future_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryStreamMultiCallable.__call__') def unary_stream_call_with_instana(wrapped, instance, argv, kwargs): parent_span = tracer.active_span diff --git a/tests/__init__.py b/tests/__init__.py index 8e92c707..76fe07d3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -33,7 +33,6 @@ print("Starting background RPC app...") rpc_server_thread.start() - if sys.version_info < (3, 7, 0): # Background Soap Server from .apps.soapserver4132 import soapserver diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py index abf8f2d6..44b9a6b6 100644 --- a/tests/apps/grpc_server/stan_server.py +++ b/tests/apps/grpc_server/stan_server.py @@ -20,13 +20,10 @@ def __init__(self, *args, **kwargs): def OneQuestionOneResponse(self, request, context): # print("😇:I was asked: %s" % request.question) - response = """\ Invention, my dear friends, is 93% perspiration, 6% electricity, \ 4% evaporation, and 2% butterscotch ripple. – Willy Wonka""" - result = {'answer': response, 'was_answered': True} - return stan_pb2.QuestionResponse(**result) def ManyQuestionsOneResponse(self, request_iterator, context): diff --git a/tests/test_grpcio.py b/tests/test_grpcio.py index 64b007cc..106e750d 100644 --- a/tests/test_grpcio.py +++ b/tests/test_grpcio.py @@ -39,7 +39,7 @@ def generate_questions(self): ] for q in questions: yield q - time.sleep(random.uniform(0.3, 0.7)) + time.sleep(random.uniform(0.2, 0.5)) def test_vanilla_request(self): response = self.server_stub.OneQuestionOneResponse(stan_pb2.QuestionRequest(question="Are you there?")) @@ -64,6 +64,10 @@ def test_unary_one_to_one(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -99,6 +103,7 @@ def test_unary_one_to_one(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') self.assertIsNone(client_span.data.rpc.error) # test-span @@ -123,6 +128,10 @@ def test_streaming_many_to_one(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -158,6 +167,7 @@ def test_streaming_many_to_one(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') self.assertIsNone(client_span.data.rpc.error) # test-span @@ -185,6 +195,10 @@ def test_streaming_one_to_many(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -220,6 +234,7 @@ def test_streaming_one_to_many(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionManyResponses') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') self.assertIsNone(client_span.data.rpc.error) # test-span @@ -246,6 +261,10 @@ def test_streaming_many_to_many(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -281,6 +300,7 @@ def test_streaming_many_to_many(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsManyReponses') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') self.assertIsNone(client_span.data.rpc.error) # test-span @@ -303,6 +323,10 @@ def test_unary_one_to_one_with_call(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -338,6 +362,7 @@ def test_unary_one_to_one_with_call(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') self.assertIsNone(client_span.data.rpc.error) # test-span @@ -361,6 +386,10 @@ def test_streaming_many_to_one_with_call(self): client_span = get_first_span_by_name(spans, 'rpc-client') test_span = get_first_span_by_name(spans, 'sdk') + assert(server_span) + assert(client_span) + assert(test_span) + # Same traceId self.assertEqual(server_span.t, client_span.t) self.assertEqual(server_span.t, test_span.t) @@ -396,9 +425,145 @@ def test_streaming_many_to_one_with_call(self): self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_async_unary(self): + def process_response(future): + result = future.result() + self.assertEqual(type(result), stan_pb2.QuestionResponse) + self.assertTrue(result.was_answered) + self.assertEqual(result.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + with tracer.start_active_span('test'): + future = self.server_stub.OneQuestionOneResponse.future( + stan_pb2.QuestionRequest(question="Are you there?")) + future.add_done_callback(process_response) + time.sleep(0.7) + + self.assertIsNone(tracer.active_span) + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') self.assertIsNone(client_span.data.rpc.error) # test-span self.assertEqual(test_span.n, 'sdk') self.assertEqual(test_span.data.sdk.name, 'test') + def test_async_stream(self): + def process_response(future): + result = future.result() + self.assertEqual(type(result), stan_pb2.QuestionResponse) + self.assertTrue(result.was_answered) + self.assertEqual(result.answer, 'Ok') + + with tracer.start_active_span('test'): + future = self.server_stub.ManyQuestionsOneResponse.future(self.generate_questions()) + future.add_done_callback(process_response) + + # The question generator delays at random intervals between questions so to assure that + # all questions are sent and processed before we start testing the results. + time.sleep(5) + + self.assertIsNone(tracer.active_span) + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') From 91948e3843728e3470ec480560b92ba7a809f713 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 18:05:12 +0200 Subject: [PATCH 12/15] Improved exception logging --- instana/span.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/instana/span.py b/instana/span.py index c0efb4b8..402084e0 100644 --- a/instana/span.py +++ b/instana/span.py @@ -1,4 +1,5 @@ from basictracer.span import BasicSpan +from .log import logger class InstanaSpan(BasicSpan): @@ -8,11 +9,24 @@ def finish(self, finish_time=None): super(InstanaSpan, self).finish(finish_time) def log_exception(self, e): - if hasattr(e, '__str__'): - self.log_kv({'message': str(e)}) - elif hasattr(e, 'message') and e.message is not None: - self.log_kv({'message': e.message}) - - self.set_tag("error", True) - ec = self.tags.get('ec', 0) - self.set_tag("ec", ec+1) + try: + message = "" + + self.set_tag("error", True) + ec = self.tags.get('ec', 0) + self.set_tag("ec", ec+1) + + if hasattr(e, '__str__'): + message = str(e) + elif hasattr(e, 'message') and e.message is not None: + message = e.message + + if self.operation_name in ['rpc-server', 'rpc-client']: + self.set_tag('rpc.error', message) + + self.log_kv({'message': message}) + + except Exception: + logger.debug("span.log_exception", exc_info=True) + raise + From d8e77d596a32274cedeaf9cf7c65f7aa2cef49ec Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 18:05:38 +0200 Subject: [PATCH 13/15] Test cases for error reporting --- tests/apps/grpc_server/stan.proto | 7 +++ tests/apps/grpc_server/stan_pb2.py | 22 +++++++- tests/apps/grpc_server/stan_pb2_grpc.py | 43 +++++++++++++-- tests/apps/grpc_server/stan_server.py | 7 ++- tests/test_grpcio.py | 71 +++++++++++++++++++++++++ 5 files changed, 142 insertions(+), 8 deletions(-) diff --git a/tests/apps/grpc_server/stan.proto b/tests/apps/grpc_server/stan.proto index f61542d9..78084159 100644 --- a/tests/apps/grpc_server/stan.proto +++ b/tests/apps/grpc_server/stan.proto @@ -3,10 +3,17 @@ syntax = "proto3"; package stan; service Stan{ + // Unary rpc OneQuestionOneResponse(QuestionRequest) returns (QuestionResponse) {} + + // Streaming rpc ManyQuestionsOneResponse(stream QuestionRequest) returns (QuestionResponse){} rpc OneQuestionManyResponses(QuestionRequest) returns (stream QuestionResponse){} rpc ManyQuestionsManyReponses(stream QuestionRequest) returns (stream QuestionResponse){} + + // Error Testing + rpc OneQuestionOneErrorResponse(QuestionRequest) returns (QuestionResponse) {} + rpc OneErroredQuestionOneResponse(QuestionRequest) returns (QuestionResponse) {} } diff --git a/tests/apps/grpc_server/stan_pb2.py b/tests/apps/grpc_server/stan_pb2.py index 9a9dbc76..28b6cf69 100644 --- a/tests/apps/grpc_server/stan_pb2.py +++ b/tests/apps/grpc_server/stan_pb2.py @@ -19,7 +19,7 @@ package='stan', syntax='proto3', serialized_options=None, - serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\xc1\x02\n\x04Stan\x12I\n\x16OneQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12M\n\x18ManyQuestionsOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x12M\n\x18OneQuestionManyResponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x30\x01\x12P\n\x19ManyQuestionsManyReponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x30\x01\x62\x06proto3') + serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\xe3\x03\n\x04Stan\x12I\n\x16OneQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12M\n\x18ManyQuestionsOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x12M\n\x18OneQuestionManyResponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x30\x01\x12P\n\x19ManyQuestionsManyReponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x30\x01\x12N\n\x1bOneQuestionOneErrorResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12P\n\x1dOneErroredQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x62\x06proto3') ) @@ -120,7 +120,7 @@ index=0, serialized_options=None, serialized_start=116, - serialized_end=437, + serialized_end=599, methods=[ _descriptor.MethodDescriptor( name='OneQuestionOneResponse', @@ -158,6 +158,24 @@ output_type=_QUESTIONRESPONSE, serialized_options=None, ), + _descriptor.MethodDescriptor( + name='OneQuestionOneErrorResponse', + full_name='stan.Stan.OneQuestionOneErrorResponse', + index=4, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='OneErroredQuestionOneResponse', + full_name='stan.Stan.OneErroredQuestionOneResponse', + index=5, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), ]) _sym_db.RegisterServiceDescriptor(_STAN) diff --git a/tests/apps/grpc_server/stan_pb2_grpc.py b/tests/apps/grpc_server/stan_pb2_grpc.py index cfca182e..61643119 100644 --- a/tests/apps/grpc_server/stan_pb2_grpc.py +++ b/tests/apps/grpc_server/stan_pb2_grpc.py @@ -4,7 +4,6 @@ import tests.apps.grpc_server.stan_pb2 as stan__pb2 - class StanStub(object): # missing associated documentation comment in .proto file pass @@ -35,6 +34,16 @@ def __init__(self, channel): request_serializer=stan__pb2.QuestionRequest.SerializeToString, response_deserializer=stan__pb2.QuestionResponse.FromString, ) + self.OneQuestionOneErrorResponse = channel.unary_unary( + '/stan.Stan/OneQuestionOneErrorResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.OneErroredQuestionOneResponse = channel.unary_unary( + '/stan.Stan/OneErroredQuestionOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) class StanServicer(object): @@ -42,15 +51,15 @@ class StanServicer(object): pass def OneQuestionOneResponse(self, request, context): - # missing associated documentation comment in .proto file - pass + """Unary + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def ManyQuestionsOneResponse(self, request_iterator, context): - # missing associated documentation comment in .proto file - pass + """Streaming + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') @@ -69,6 +78,20 @@ def ManyQuestionsManyReponses(self, request_iterator, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def OneQuestionOneErrorResponse(self, request, context): + """Error Testing + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def OneErroredQuestionOneResponse(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_StanServicer_to_server(servicer, server): rpc_method_handlers = { @@ -92,6 +115,16 @@ def add_StanServicer_to_server(servicer, server): request_deserializer=stan__pb2.QuestionRequest.FromString, response_serializer=stan__pb2.QuestionResponse.SerializeToString, ), + 'OneQuestionOneErrorResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneQuestionOneErrorResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'OneErroredQuestionOneResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneErroredQuestionOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'stan.Stan', rpc_method_handlers) diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py index 44b9a6b6..cada6673 100644 --- a/tests/apps/grpc_server/stan_server.py +++ b/tests/apps/grpc_server/stan_server.py @@ -36,7 +36,6 @@ def ManyQuestionsOneResponse(self, request_iterator, context): def OneQuestionManyResponses(self, request, context): # print("😇:I was asked: %s" % request.question) - for count in range(6): result = {'answer': 'Ok', 'was_answered': True} yield stan_pb2.QuestionResponse(**result) @@ -47,6 +46,12 @@ def ManyQuestionsManyReponses(self, request_iterator, context): result = {'answer': 'Ok', 'was_answered': True} yield stan_pb2.QuestionResponse(**result) + def OneQuestionOneErrorResponse(self, request, context): + # print("😇:I was asked: %s" % request.question) + raise Exception('Simulated error') + result = {'answer': "ThisError", 'was_answered': True} + return stan_pb2.QuestionResponse(**result) + def start_server(self): """ Function which actually starts the gRPC server, and preps diff --git a/tests/test_grpcio.py b/tests/test_grpcio.py index 106e750d..493c78fa 100644 --- a/tests/test_grpcio.py +++ b/tests/test_grpcio.py @@ -567,3 +567,74 @@ def process_response(future): # test-span self.assertEqual(test_span.n, 'sdk') self.assertEqual(test_span.data.sdk.name, 'test') + + def test_server_error(self): + try: + response = None + with tracer.start_active_span('test'): + response = self.server_stub.OneQuestionOneErrorResponse(stan_pb2.QuestionRequest(question="Do u error?")) + except: + pass + + self.assertIsNone(tracer.active_span) + self.assertIsNone(response) + + spans = self.recorder.queued_spans() + self.assertEqual(4, len(spans)) + + log_span = get_first_span_by_name(spans, 'log') + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(log_span) + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertTrue(client_span.error) + self.assertEqual(client_span.ec, 1) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneErrorResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneErrorResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') + self.assertIsNotNone(client_span.data.rpc.error) + + # log + self.assertEqual(log_span.n, 'log') + self.assertIsNotNone(log_span.data.log) + self.assertEqual(log_span.data.log['message'], 'Exception calling application: Simulated error') + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') From f4572119aba9791f9aaaed758bef25ea9c64fe87 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 21:37:17 +0200 Subject: [PATCH 14/15] Attempt to work-around build bug --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c93f4c4d..ca024bf5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ version: 2 jobs: python27: docker: - - image: circleci/python:2.7.16 + - image: circleci/python:2.7.15 # Specify service dependencies here if necessary # CircleCI maintains a library of pre-built images From cd39ca8543fd7fda03104282af8061305686650c Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 18 Jul 2019 21:43:24 +0200 Subject: [PATCH 15/15] Limit grpcio tests to python >3.5 --- runtests.py | 4 +++- tests/__init__.py | 26 ++++++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/runtests.py b/runtests.py index 9263717a..21ab79bc 100644 --- a/runtests.py +++ b/runtests.py @@ -5,7 +5,9 @@ command_line = [__file__, '--verbose'] if LooseVersion(sys.version) < LooseVersion('3.5.3'): - command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', '-e', 'async', '-e', 'tornado']) + command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', + '-e', 'async', '-e', 'tornado', + '-e', 'grpcio']) if LooseVersion(sys.version) >= LooseVersion('3.7.0'): command_line.extend(['-e', 'sudsjurko']) diff --git a/tests/__init__.py b/tests/__init__.py index 76fe07d3..a70d1a22 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,18 +20,20 @@ flask.start() -# Background RPC application -# -# Spawn the background RPC app that the tests will throw -# requests at. -import tests.apps.grpc_server -from .apps.grpc_server.stan_server import StanServicer -stan_servicer = StanServicer() -rpc_server_thread = threading.Thread(target=stan_servicer.start_server) -rpc_server_thread.daemon = True -rpc_server_thread.name = "Background RPC app" -print("Starting background RPC app...") -rpc_server_thread.start() +if sys.version_info >= (3, 5, 3): + # Background RPC application + # + # Spawn the background RPC app that the tests will throw + # requests at. + import tests.apps.grpc_server + from .apps.grpc_server.stan_server import StanServicer + stan_servicer = StanServicer() + rpc_server_thread = threading.Thread(target=stan_servicer.start_server) + rpc_server_thread.daemon = True + rpc_server_thread.name = "Background RPC app" + print("Starting background RPC app...") + rpc_server_thread.start() + if sys.version_info < (3, 7, 0): # Background Soap Server