diff --git a/.circleci/config.yml b/.circleci/config.yml index 158094d0..ca024bf5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,13 +6,13 @@ version: 2 jobs: python27: docker: - - image: circleci/python:2.7.16 + - image: circleci/python:2.7.15 # Specify service dependencies here if necessary # CircleCI maintains a library of pre-built images # documented at https://circleci.com/docs/2.0/circleci-images/ - image: circleci/postgres:9.6.5-alpine-ram - - image: circleci/mariadb:10-ram + - image: circleci/mariadb:10.1-ram - image: circleci/redis:5.0.4 - image: rabbitmq:3.5.4 diff --git a/instana/__init__.py b/instana/__init__.py index 867be0e8..ad457014 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -69,6 +69,7 @@ def boot_agent(): from .instrumentation import mysqlclient from .instrumentation import flask + from .instrumentation import grpcio from .instrumentation.tornado import client from .instrumentation.tornado import server from .instrumentation import logging diff --git a/instana/binary_propagator.py b/instana/binary_propagator.py new file mode 100644 index 00000000..cfad7050 --- /dev/null +++ b/instana/binary_propagator.py @@ -0,0 +1,83 @@ +from __future__ import absolute_import + +import opentracing as ot + +from .log import logger +from .util import header_to_id +from .span_context import InstanaSpanContext + + +class BinaryPropagator(): + """ + A Propagator for TEXT_MAP. + """ + HEADER_KEY_T = b'x-instana-t' + HEADER_KEY_S = b'x-instana-s' + HEADER_KEY_L = b'x-instana-l' + + def inject(self, span_context, carrier): + try: + trace_id = str.encode(span_context.trace_id) + span_id = str.encode(span_context.span_id) + level = str.encode("1") + + if type(carrier) is dict or hasattr(carrier, "__dict__"): + carrier[self.HEADER_KEY_T] = trace_id + carrier[self.HEADER_KEY_S] = span_id + carrier[self.HEADER_KEY_L] = level + elif type(carrier) is list: + carrier.append((self.HEADER_KEY_T, trace_id)) + carrier.append((self.HEADER_KEY_S, span_id)) + carrier.append((self.HEADER_KEY_L, level)) + elif type(carrier) is tuple: + carrier = carrier.__add__(((self.HEADER_KEY_T, trace_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_S, span_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_L, level),)) + elif hasattr(carrier, '__setitem__'): + carrier.__setitem__(self.HEADER_KEY_T, trace_id) + carrier.__setitem__(self.HEADER_KEY_S, span_id) + carrier.__setitem__(self.HEADER_KEY_L, level) + else: + raise Exception("Unsupported carrier type", type(carrier)) + + return carrier + except Exception: + logger.debug("inject error:", exc_info=True) + + def extract(self, carrier): # noqa + trace_id = None + span_id = None + level = None + + try: + if type(carrier) is dict or hasattr(carrier, "__getitem__"): + dc = carrier + elif hasattr(carrier, "__dict__"): + dc = carrier.__dict__ + elif type(carrier) is list: + dc = dict(carrier) + else: + raise ot.SpanContextCorruptedException() + + for key, value in dc.items(): + if type(key) is str: + key = str.encode(key) + + if self.HEADER_KEY_T == key: + trace_id = header_to_id(value) + elif self.HEADER_KEY_S == key: + span_id = header_to_id(value) + elif self.HEADER_KEY_L == key: + level = value + + ctx = None + if trace_id is not None and span_id is not None: + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) + return ctx + + except Exception: + logger.debug("extract error:", exc_info=True) diff --git a/instana/http_propagator.py b/instana/http_propagator.py index 2413b14b..b4bde7d0 100644 --- a/instana/http_propagator.py +++ b/instana/http_propagator.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import opentracing as ot -from basictracer.context import SpanContext +from .span_context import InstanaSpanContext from .log import logger from .util import header_to_id @@ -62,6 +62,7 @@ def inject(self, span_context, carrier): def extract(self, carrier): # noqa trace_id = None span_id = None + level = 1 try: if type(carrier) is dict or hasattr(carrier, "__getitem__"): @@ -82,18 +83,23 @@ def extract(self, carrier): # noqa trace_id = header_to_id(dc[key]) elif self.LC_HEADER_KEY_S == lc_key: span_id = header_to_id(dc[key]) + elif self.LC_HEADER_KEY_L == lc_key: + level = dc[key] elif self.ALT_LC_HEADER_KEY_T == lc_key: trace_id = header_to_id(dc[key]) elif self.ALT_LC_HEADER_KEY_S == lc_key: span_id = header_to_id(dc[key]) + elif self.ALT_LC_HEADER_KEY_L == lc_key: + level = dc[key] ctx = None if trace_id is not None and span_id is not None: - ctx = SpanContext(span_id=span_id, - trace_id=trace_id, - baggage={}, - sampled=True) + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) return ctx except Exception: diff --git a/instana/instrumentation/grpcio.py b/instana/instrumentation/grpcio.py new file mode 100644 index 00000000..d5f898dd --- /dev/null +++ b/instana/instrumentation/grpcio.py @@ -0,0 +1,258 @@ +from __future__ import absolute_import + +import wrapt +import opentracing + +from ..log import logger +from ..singletons import tracer + +try: + import grpc + from grpc._channel import _UnaryUnaryMultiCallable, _StreamUnaryMultiCallable, \ + _UnaryStreamMultiCallable, _StreamStreamMultiCallable + + SUPPORTED_TYPES = [ _UnaryUnaryMultiCallable, + _StreamUnaryMultiCallable, + _UnaryStreamMultiCallable, + _StreamStreamMultiCallable ] + + def collect_tags(span, instance, argv, kwargs): + try: + span.set_tag('rpc.flavor', 'grpc') + + if type(instance) in SUPPORTED_TYPES: + method = instance._method.decode() + target = instance._channel.target().decode() + elif type(argv[0]) is grpc._cython.cygrpc.RequestCallEvent: + method = argv[0].call_details.method.decode() + target = argv[0].call_details.host.decode() + elif len(argv) > 2: + method = argv[2][2][1]._method.decode() + target = argv[2][2][1]._channel.target().decode() + + span.set_tag('rpc.call', method) + + parts = target.split(':') + if len(parts) == 2: + span.set_tag('rpc.host', parts[0]) + span.set_tag('rpc.port', parts[1]) + except: + logger.debug("grpc.collect_tags non-fatal error", exc_info=True) + finally: + return span + + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.with_call') + def unary_unary_with_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if "metadata" not in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.future') + def unary_unary_future_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if "metadata" not in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryUnaryMultiCallable.__call__') + def unary_unary_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'unary') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.__call__') + def stream_unary_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.with_call') + def stream_unary_with_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamUnaryMultiCallable.future') + def stream_unary_future_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_UnaryStreamMultiCallable.__call__') + def unary_stream_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._channel', '_StreamStreamMultiCallable.__call__') + def stream_stream_call_with_instana(wrapped, instance, argv, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*argv, **kwargs) + + with tracer.start_active_span("rpc-client", child_of=parent_span) as scope: + try: + if not "metadata" in kwargs: + kwargs["metadata"] = [] + + kwargs["metadata"] = tracer.inject(scope.span.context, opentracing.Format.BINARY, kwargs['metadata']) + collect_tags(scope.span, instance, argv, kwargs) + scope.span.set_tag('rpc.call_type', 'stream') + + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + @wrapt.patch_function_wrapper('grpc._server', '_call_behavior') + def call_behavior_with_instana(wrapped, instance, argv, kwargs): + # Prep any incoming context headers + metadata = argv[0].invocation_metadata + metadata_dict = {} + for c in metadata: + metadata_dict[c.key] = c.value + + ctx = tracer.extract(opentracing.Format.BINARY, metadata_dict) + + with tracer.start_active_span("rpc-server", child_of=ctx) as scope: + try: + collect_tags(scope.span, instance, argv, kwargs) + rv = wrapped(*argv, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + +except ImportError: + pass diff --git a/instana/span.py b/instana/span.py index c0efb4b8..402084e0 100644 --- a/instana/span.py +++ b/instana/span.py @@ -1,4 +1,5 @@ from basictracer.span import BasicSpan +from .log import logger class InstanaSpan(BasicSpan): @@ -8,11 +9,24 @@ def finish(self, finish_time=None): super(InstanaSpan, self).finish(finish_time) def log_exception(self, e): - if hasattr(e, '__str__'): - self.log_kv({'message': str(e)}) - elif hasattr(e, 'message') and e.message is not None: - self.log_kv({'message': e.message}) - - self.set_tag("error", True) - ec = self.tags.get('ec', 0) - self.set_tag("ec", ec+1) + try: + message = "" + + self.set_tag("error", True) + ec = self.tags.get('ec', 0) + self.set_tag("ec", ec+1) + + if hasattr(e, '__str__'): + message = str(e) + elif hasattr(e, 'message') and e.message is not None: + message = e.message + + if self.operation_name in ['rpc-server', 'rpc-client']: + self.set_tag('rpc.error', message) + + self.log_kv({'message': message}) + + except Exception: + logger.debug("span.log_exception", exc_info=True) + raise + diff --git a/instana/span_context.py b/instana/span_context.py new file mode 100644 index 00000000..6fd4d120 --- /dev/null +++ b/instana/span_context.py @@ -0,0 +1,22 @@ + +from basictracer.context import SpanContext + + +class InstanaSpanContext(SpanContext): + """ + SpanContext based on the Basic tracer implementation. + We subclass this so that we can also store 'level' and eventually + remove the basictracer dependency altogether. + """ + def __init__( + self, + trace_id=None, + span_id=None, + baggage=None, + sampled=True, + level=1): + self.level = level + + super(InstanaSpanContext, self).__init__(trace_id, span_id, baggage, sampled) + + diff --git a/instana/text_propagator.py b/instana/text_propagator.py index 7a5bdbd2..a71981b2 100644 --- a/instana/text_propagator.py +++ b/instana/text_propagator.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import opentracing as ot -from basictracer.context import SpanContext +from .span_context import InstanaSpanContext from .log import logger from .util import header_to_id @@ -11,7 +11,6 @@ class TextPropagator(): """ A Propagator for TEXT_MAP. """ - HEADER_KEY_T = 'X-INSTANA-T' HEADER_KEY_S = 'X-INSTANA-S' HEADER_KEY_L = 'X-INSTANA-L' @@ -29,6 +28,10 @@ def inject(self, span_context, carrier): carrier.append((self.HEADER_KEY_T, trace_id)) carrier.append((self.HEADER_KEY_S, span_id)) carrier.append((self.HEADER_KEY_L, "1")) + elif type(carrier) is tuple: + carrier = carrier.__add__(((self.HEADER_KEY_T, trace_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_S, span_id),)) + carrier = carrier.__add__(((self.HEADER_KEY_L, "1"),)) elif hasattr(carrier, '__setitem__'): carrier.__setitem__(self.HEADER_KEY_T, trace_id) carrier.__setitem__(self.HEADER_KEY_S, span_id) @@ -36,12 +39,14 @@ def inject(self, span_context, carrier): else: raise Exception("Unsupported carrier type", type(carrier)) + return carrier except Exception: logger.debug("inject error:", exc_info=True) def extract(self, carrier): # noqa trace_id = None span_id = None + level = 1 try: if type(carrier) is dict or hasattr(carrier, "__getitem__"): @@ -58,13 +63,16 @@ def extract(self, carrier): # noqa trace_id = header_to_id(dc[key]) elif self.HEADER_KEY_S == key: span_id = header_to_id(dc[key]) + elif self.HEADER_KEY_L == key: + level = dc[key] ctx = None if trace_id is not None and span_id is not None: - ctx = SpanContext(span_id=span_id, - trace_id=trace_id, - baggage={}, - sampled=True) + ctx = InstanaSpanContext(span_id=span_id, + trace_id=trace_id, + level=level, + baggage={}, + sampled=True) return ctx except Exception: diff --git a/instana/tracer.py b/instana/tracer.py index e35b5974..ea0b4fe6 100644 --- a/instana/tracer.py +++ b/instana/tracer.py @@ -7,13 +7,14 @@ import opentracing as ot from basictracer import BasicTracer -from basictracer.context import SpanContext +from .binary_propagator import BinaryPropagator from .http_propagator import HTTPPropagator +from .text_propagator import TextPropagator +from .span_context import InstanaSpanContext from .options import Options from .recorder import InstanaRecorder, InstanaSampler from .span import InstanaSpan -from .text_propagator import TextPropagator from .util import generate_id @@ -28,6 +29,7 @@ def __init__(self, options=Options(), scope_manager=None, recorder=None): self._propagators[ot.Format.HTTP_HEADERS] = HTTPPropagator() self._propagators[ot.Format.TEXT_MAP] = TextPropagator() + self._propagators[ot.Format.BINARY] = BinaryPropagator() def handle_fork(self): # Nothing to do for the Tracer; Pass onto Recorder @@ -83,7 +85,7 @@ def start_span(self, # Assemble the child ctx gid = generate_id() - ctx = SpanContext(span_id=gid) + ctx = InstanaSpanContext(span_id=gid) if parent_ctx is not None: if parent_ctx._baggage is not None: ctx._baggage = parent_ctx._baggage.copy() @@ -112,7 +114,7 @@ def start_span(self, def inject(self, span_context, format, carrier): if format in self._propagators: - self._propagators[format].inject(span_context, carrier) + return self._propagators[format].inject(span_context, carrier) else: raise ot.UnsupportedFormatException() diff --git a/runtests.py b/runtests.py index 9263717a..21ab79bc 100644 --- a/runtests.py +++ b/runtests.py @@ -5,7 +5,9 @@ command_line = [__file__, '--verbose'] if LooseVersion(sys.version) < LooseVersion('3.5.3'): - command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', '-e', 'async', '-e', 'tornado']) + command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', + '-e', 'async', '-e', 'tornado', + '-e', 'grpcio']) if LooseVersion(sys.version) >= LooseVersion('3.7.0'): command_line.extend(['-e', 'sudsjurko']) diff --git a/setup.py b/setup.py index a81ac9d1..a7ed1517 100644 --- a/setup.py +++ b/setup.py @@ -72,6 +72,7 @@ def check_setuptools(): 'django>=1.11,<2.2', 'nose>=1.0', 'flask>=0.12.2', + 'grpcio>=1.18.0', 'lxml>=3.4', 'mock>=2.0.0', 'mysqlclient>=1.3.14;python_version>="3.5"', diff --git a/tests/__init__.py b/tests/__init__.py index d983039e..a70d1a22 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,6 +20,21 @@ flask.start() +if sys.version_info >= (3, 5, 3): + # Background RPC application + # + # Spawn the background RPC app that the tests will throw + # requests at. + import tests.apps.grpc_server + from .apps.grpc_server.stan_server import StanServicer + stan_servicer = StanServicer() + rpc_server_thread = threading.Thread(target=stan_servicer.start_server) + rpc_server_thread.daemon = True + rpc_server_thread.name = "Background RPC app" + print("Starting background RPC app...") + rpc_server_thread.start() + + if sys.version_info < (3, 7, 0): # Background Soap Server from .apps.soapserver4132 import soapserver diff --git a/tests/apps/grpc_server/README.md b/tests/apps/grpc_server/README.md new file mode 100644 index 00000000..a02cdf3f --- /dev/null +++ b/tests/apps/grpc_server/README.md @@ -0,0 +1,8 @@ +To regenerate from the proto file: + +```bash +pip install grpcio grpcio-tools +python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. ./stan.proto +``` + +Inspired by: https://technokeeda.com/programming/grpc-python-tutorial/ \ No newline at end of file diff --git a/tests/apps/grpc_server/__init__.py b/tests/apps/grpc_server/__init__.py new file mode 100644 index 00000000..779dcbfa --- /dev/null +++ b/tests/apps/grpc_server/__init__.py @@ -0,0 +1 @@ +# __all__ = ["digestor_pb2", "digestor_pb2_grpc"] \ No newline at end of file diff --git a/tests/apps/grpc_server/stan.proto b/tests/apps/grpc_server/stan.proto new file mode 100644 index 00000000..78084159 --- /dev/null +++ b/tests/apps/grpc_server/stan.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package stan; + +service Stan{ + // Unary + rpc OneQuestionOneResponse(QuestionRequest) returns (QuestionResponse) {} + + // Streaming + rpc ManyQuestionsOneResponse(stream QuestionRequest) returns (QuestionResponse){} + rpc OneQuestionManyResponses(QuestionRequest) returns (stream QuestionResponse){} + rpc ManyQuestionsManyReponses(stream QuestionRequest) returns (stream QuestionResponse){} + + // Error Testing + rpc OneQuestionOneErrorResponse(QuestionRequest) returns (QuestionResponse) {} + rpc OneErroredQuestionOneResponse(QuestionRequest) returns (QuestionResponse) {} +} + + +message QuestionRequest { + string question = 1; +} + +message QuestionResponse { + string answer = 1; + bool was_answered = 2; +} diff --git a/tests/apps/grpc_server/stan_pb2.py b/tests/apps/grpc_server/stan_pb2.py new file mode 100644 index 00000000..28b6cf69 --- /dev/null +++ b/tests/apps/grpc_server/stan_pb2.py @@ -0,0 +1,184 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: stan.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='stan.proto', + package='stan', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\nstan.proto\x12\x04stan\"#\n\x0fQuestionRequest\x12\x10\n\x08question\x18\x01 \x01(\t\"8\n\x10QuestionResponse\x12\x0e\n\x06\x61nswer\x18\x01 \x01(\t\x12\x14\n\x0cwas_answered\x18\x02 \x01(\x08\x32\xe3\x03\n\x04Stan\x12I\n\x16OneQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12M\n\x18ManyQuestionsOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x12M\n\x18OneQuestionManyResponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x30\x01\x12P\n\x19ManyQuestionsManyReponses\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00(\x01\x30\x01\x12N\n\x1bOneQuestionOneErrorResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x12P\n\x1dOneErroredQuestionOneResponse\x12\x15.stan.QuestionRequest\x1a\x16.stan.QuestionResponse\"\x00\x62\x06proto3') +) + + + + +_QUESTIONREQUEST = _descriptor.Descriptor( + name='QuestionRequest', + full_name='stan.QuestionRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='question', full_name='stan.QuestionRequest.question', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=20, + serialized_end=55, +) + + +_QUESTIONRESPONSE = _descriptor.Descriptor( + name='QuestionResponse', + full_name='stan.QuestionResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='answer', full_name='stan.QuestionResponse.answer', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='was_answered', full_name='stan.QuestionResponse.was_answered', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=57, + serialized_end=113, +) + +DESCRIPTOR.message_types_by_name['QuestionRequest'] = _QUESTIONREQUEST +DESCRIPTOR.message_types_by_name['QuestionResponse'] = _QUESTIONRESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +QuestionRequest = _reflection.GeneratedProtocolMessageType('QuestionRequest', (_message.Message,), dict( + DESCRIPTOR = _QUESTIONREQUEST, + __module__ = 'stan_pb2' + # @@protoc_insertion_point(class_scope:stan.QuestionRequest) + )) +_sym_db.RegisterMessage(QuestionRequest) + +QuestionResponse = _reflection.GeneratedProtocolMessageType('QuestionResponse', (_message.Message,), dict( + DESCRIPTOR = _QUESTIONRESPONSE, + __module__ = 'stan_pb2' + # @@protoc_insertion_point(class_scope:stan.QuestionResponse) + )) +_sym_db.RegisterMessage(QuestionResponse) + + + +_STAN = _descriptor.ServiceDescriptor( + name='Stan', + full_name='stan.Stan', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=116, + serialized_end=599, + methods=[ + _descriptor.MethodDescriptor( + name='OneQuestionOneResponse', + full_name='stan.Stan.OneQuestionOneResponse', + index=0, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='ManyQuestionsOneResponse', + full_name='stan.Stan.ManyQuestionsOneResponse', + index=1, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='OneQuestionManyResponses', + full_name='stan.Stan.OneQuestionManyResponses', + index=2, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='ManyQuestionsManyReponses', + full_name='stan.Stan.ManyQuestionsManyReponses', + index=3, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='OneQuestionOneErrorResponse', + full_name='stan.Stan.OneQuestionOneErrorResponse', + index=4, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='OneErroredQuestionOneResponse', + full_name='stan.Stan.OneErroredQuestionOneResponse', + index=5, + containing_service=None, + input_type=_QUESTIONREQUEST, + output_type=_QUESTIONRESPONSE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_STAN) + +DESCRIPTOR.services_by_name['Stan'] = _STAN + +# @@protoc_insertion_point(module_scope) diff --git a/tests/apps/grpc_server/stan_pb2_grpc.py b/tests/apps/grpc_server/stan_pb2_grpc.py new file mode 100644 index 00000000..61643119 --- /dev/null +++ b/tests/apps/grpc_server/stan_pb2_grpc.py @@ -0,0 +1,131 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import tests.apps.grpc_server.stan_pb2 as stan__pb2 + + +class StanStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.OneQuestionOneResponse = channel.unary_unary( + '/stan.Stan/OneQuestionOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.ManyQuestionsOneResponse = channel.stream_unary( + '/stan.Stan/ManyQuestionsOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.OneQuestionManyResponses = channel.unary_stream( + '/stan.Stan/OneQuestionManyResponses', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.ManyQuestionsManyReponses = channel.stream_stream( + '/stan.Stan/ManyQuestionsManyReponses', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.OneQuestionOneErrorResponse = channel.unary_unary( + '/stan.Stan/OneQuestionOneErrorResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + self.OneErroredQuestionOneResponse = channel.unary_unary( + '/stan.Stan/OneErroredQuestionOneResponse', + request_serializer=stan__pb2.QuestionRequest.SerializeToString, + response_deserializer=stan__pb2.QuestionResponse.FromString, + ) + + +class StanServicer(object): + # missing associated documentation comment in .proto file + pass + + def OneQuestionOneResponse(self, request, context): + """Unary + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ManyQuestionsOneResponse(self, request_iterator, context): + """Streaming + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def OneQuestionManyResponses(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ManyQuestionsManyReponses(self, request_iterator, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def OneQuestionOneErrorResponse(self, request, context): + """Error Testing + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def OneErroredQuestionOneResponse(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_StanServicer_to_server(servicer, server): + rpc_method_handlers = { + 'OneQuestionOneResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneQuestionOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'ManyQuestionsOneResponse': grpc.stream_unary_rpc_method_handler( + servicer.ManyQuestionsOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'OneQuestionManyResponses': grpc.unary_stream_rpc_method_handler( + servicer.OneQuestionManyResponses, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'ManyQuestionsManyReponses': grpc.stream_stream_rpc_method_handler( + servicer.ManyQuestionsManyReponses, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'OneQuestionOneErrorResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneQuestionOneErrorResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + 'OneErroredQuestionOneResponse': grpc.unary_unary_rpc_method_handler( + servicer.OneErroredQuestionOneResponse, + request_deserializer=stan__pb2.QuestionRequest.FromString, + response_serializer=stan__pb2.QuestionResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'stan.Stan', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/tests/apps/grpc_server/stan_server.py b/tests/apps/grpc_server/stan_server.py new file mode 100644 index 00000000..cada6673 --- /dev/null +++ b/tests/apps/grpc_server/stan_server.py @@ -0,0 +1,82 @@ +import grpc +import time +import tests.apps.grpc_server.stan_pb2 as stan_pb2 +import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc +from concurrent import futures + +from ...helpers import testenv + +testenv["grpc_port"] = 10814 +testenv["grpc_host"] = "127.0.0.1" +testenv["grpc_server"] = testenv["grpc_host"] + ":" + str(testenv["grpc_port"]) + + +class StanServicer(stan_pb2_grpc.StanServicer): + """ + gRPC server for Stan Service + """ + def __init__(self, *args, **kwargs): + self.server_port = testenv['grpc_port'] + + def OneQuestionOneResponse(self, request, context): + # print("😇:I was asked: %s" % request.question) + response = """\ +Invention, my dear friends, is 93% perspiration, 6% electricity, \ +4% evaporation, and 2% butterscotch ripple. – Willy Wonka""" + result = {'answer': response, 'was_answered': True} + return stan_pb2.QuestionResponse(**result) + + def ManyQuestionsOneResponse(self, request_iterator, context): + for request in request_iterator: + # print("😇:I was asked: %s" % request.question) + pass + + result = {'answer': 'Ok', 'was_answered': True} + return stan_pb2.QuestionResponse(**result) + + def OneQuestionManyResponses(self, request, context): + # print("😇:I was asked: %s" % request.question) + for count in range(6): + result = {'answer': 'Ok', 'was_answered': True} + yield stan_pb2.QuestionResponse(**result) + + def ManyQuestionsManyReponses(self, request_iterator, context): + for request in request_iterator: + # print("😇:I was asked: %s" % request.question) + result = {'answer': 'Ok', 'was_answered': True} + yield stan_pb2.QuestionResponse(**result) + + def OneQuestionOneErrorResponse(self, request, context): + # print("😇:I was asked: %s" % request.question) + raise Exception('Simulated error') + result = {'answer': "ThisError", 'was_answered': True} + return stan_pb2.QuestionResponse(**result) + + def start_server(self): + """ + Function which actually starts the gRPC server, and preps + it for serving incoming connections + """ + # declare a server object with desired number + # of thread pool workers. + rpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + # This line can be ignored + stan_pb2_grpc.add_StanServicer_to_server(StanServicer(), rpc_server) + + # bind the server to the port defined above + rpc_server.add_insecure_port('[::]:{}'.format(self.server_port)) + + # start the server + rpc_server.start() + + try: + # need an infinite loop since the above + # code is non blocking, and if I don't do this + # the program will exit + while True: + time.sleep(60*60*60) + except KeyboardInterrupt: + rpc_server.stop(0) + print('Stan as a Service RPC Server Stopped ...') + diff --git a/tests/test_grpcio.py b/tests/test_grpcio.py new file mode 100644 index 00000000..493c78fa --- /dev/null +++ b/tests/test_grpcio.py @@ -0,0 +1,640 @@ +from __future__ import absolute_import + +import time +import unittest +import random + +import grpc + +import tests.apps.grpc_server.stan_pb2 as stan_pb2 +import tests.apps.grpc_server.stan_pb2_grpc as stan_pb2_grpc + +from instana.singletons import tracer +from .helpers import testenv, get_first_span_by_name + + +class TestGRPCIO(unittest.TestCase): + def setUp(self): + """ Clear all spans before a test run """ + self.recorder = tracer.recorder + self.recorder.clear_spans() + self.channel = grpc.insecure_channel(testenv["grpc_server"]) + self.server_stub = stan_pb2_grpc.StanStub(self.channel) + # The grpc client apparently needs a second to connect and initialize + time.sleep(1) + + def tearDown(self): + """ Do nothing for now """ + pass + + def generate_questions(self): + """ Used in the streaming grpc tests """ + questions = [ + stan_pb2.QuestionRequest(question="Are you there?"), + stan_pb2.QuestionRequest(question="What time is it?"), + stan_pb2.QuestionRequest(question="Where in the world is Waldo?"), + stan_pb2.QuestionRequest(question="What did one campfire say to the other?"), + stan_pb2.QuestionRequest(question="Is cereal soup?"), + stan_pb2.QuestionRequest(question="What is always coming, but never arrives?") + ] + for q in questions: + yield q + time.sleep(random.uniform(0.2, 0.5)) + + def test_vanilla_request(self): + response = self.server_stub.OneQuestionOneResponse(stan_pb2.QuestionRequest(question="Are you there?")) + self.assertEqual(response.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + def test_vanilla_request_via_with_call(self): + response = self.server_stub.OneQuestionOneResponse.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + self.assertEqual(response[0].answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + def test_unary_one_to_one(self): + with tracer.start_active_span('test'): + response = self.server_stub.OneQuestionOneResponse(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + self.assertEqual(response.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_one(self): + + with tracer.start_active_span('test'): + response = self.server_stub.ManyQuestionsOneResponse(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + + self.assertEqual('Ok', response.answer) + self.assertEqual(True, response.was_answered) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_one_to_many(self): + + with tracer.start_active_span('test'): + responses = self.server_stub.OneQuestionManyResponses(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(responses) + + final_answers = [] + for response in responses: + final_answers.append(response) + + self.assertEqual(len(final_answers), 6) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionManyResponses') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionManyResponses') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_many(self): + with tracer.start_active_span('test'): + responses = self.server_stub.ManyQuestionsManyReponses(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(responses) + + final_answers = [] + for response in responses: + final_answers.append(response) + + self.assertEqual(len(final_answers), 6) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsManyReponses') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsManyReponses') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_unary_one_to_one_with_call(self): + with tracer.start_active_span('test'): + response = self.server_stub.OneQuestionOneResponse.with_call(stan_pb2.QuestionRequest(question="Are you there?")) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + self.assertEqual(type(response), tuple) + self.assertEqual(response[0].answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_streaming_many_to_one_with_call(self): + with tracer.start_active_span('test'): + response = self.server_stub.ManyQuestionsOneResponse.with_call(self.generate_questions()) + + self.assertIsNone(tracer.active_span) + self.assertIsNotNone(response) + + self.assertEqual('Ok', response[0].answer) + self.assertEqual(True, response[0].was_answered) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_async_unary(self): + def process_response(future): + result = future.result() + self.assertEqual(type(result), stan_pb2.QuestionResponse) + self.assertTrue(result.was_answered) + self.assertEqual(result.answer, "Invention, my dear friends, is 93% perspiration, 6% electricity, 4% evaporation, and 2% butterscotch ripple. – Willy Wonka") + + with tracer.start_active_span('test'): + future = self.server_stub.OneQuestionOneResponse.future( + stan_pb2.QuestionRequest(question="Are you there?")) + future.add_done_callback(process_response) + time.sleep(0.7) + + self.assertIsNone(tracer.active_span) + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_async_stream(self): + def process_response(future): + result = future.result() + self.assertEqual(type(result), stan_pb2.QuestionResponse) + self.assertTrue(result.was_answered) + self.assertEqual(result.answer, 'Ok') + + with tracer.start_active_span('test'): + future = self.server_stub.ManyQuestionsOneResponse.future(self.generate_questions()) + future.add_done_callback(process_response) + + # The question generator delays at random intervals between questions so to assure that + # all questions are sent and processed before we start testing the results. + time.sleep(5) + + self.assertIsNone(tracer.active_span) + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(client_span.error) + self.assertIsNone(client_span.ec) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/ManyQuestionsOneResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'stream') + self.assertIsNone(client_span.data.rpc.error) + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') + + def test_server_error(self): + try: + response = None + with tracer.start_active_span('test'): + response = self.server_stub.OneQuestionOneErrorResponse(stan_pb2.QuestionRequest(question="Do u error?")) + except: + pass + + self.assertIsNone(tracer.active_span) + self.assertIsNone(response) + + spans = self.recorder.queued_spans() + self.assertEqual(4, len(spans)) + + log_span = get_first_span_by_name(spans, 'log') + server_span = get_first_span_by_name(spans, 'rpc-server') + client_span = get_first_span_by_name(spans, 'rpc-client') + test_span = get_first_span_by_name(spans, 'sdk') + + assert(log_span) + assert(server_span) + assert(client_span) + assert(test_span) + + # Same traceId + self.assertEqual(server_span.t, client_span.t) + self.assertEqual(server_span.t, test_span.t) + + # Parent relationships + self.assertEqual(server_span.p, client_span.s) + self.assertEqual(client_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertTrue(client_span.error) + self.assertEqual(client_span.ec, 1) + self.assertFalse(server_span.error) + self.assertIsNone(server_span.ec) + + # rpc-server + self.assertEqual(server_span.n, 'rpc-server') + self.assertEqual(server_span.k, 1) + self.assertIsNotNone(server_span.stack) + self.assertEqual(2, len(server_span.stack)) + self.assertEqual(server_span.data.rpc.flavor, 'grpc') + self.assertEqual(server_span.data.rpc.call, '/stan.Stan/OneQuestionOneErrorResponse') + self.assertEqual(server_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(server_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertIsNone(server_span.data.rpc.error) + + # rpc-client + self.assertEqual(client_span.n, 'rpc-client') + self.assertEqual(client_span.k, 2) + self.assertIsNotNone(client_span.stack) + self.assertEqual(client_span.data.rpc.flavor, 'grpc') + self.assertEqual(client_span.data.rpc.call, '/stan.Stan/OneQuestionOneErrorResponse') + self.assertEqual(client_span.data.rpc.host, testenv["grpc_host"]) + self.assertEqual(client_span.data.rpc.port, str(testenv["grpc_port"])) + self.assertEqual(client_span.data.rpc.call_type, 'unary') + self.assertIsNotNone(client_span.data.rpc.error) + + # log + self.assertEqual(log_span.n, 'log') + self.assertIsNotNone(log_span.data.log) + self.assertEqual(log_span.data.log['message'], 'Exception calling application: Simulated error') + + # test-span + self.assertEqual(test_span.n, 'sdk') + self.assertEqual(test_span.data.sdk.name, 'test') diff --git a/tests/test_ot_propagators.py b/tests/test_ot_propagators.py index 7417b3d8..e6c4742f 100644 --- a/tests/test_ot_propagators.py +++ b/tests/test_ot_propagators.py @@ -6,7 +6,7 @@ import instana.http_propagator as ihp import instana.text_propagator as itp -from instana import options, util +from instana import options, span_context from instana.tracer import InstanaTracer @@ -58,7 +58,7 @@ def test_http_basic_extract(): carrier = {'X-Instana-T': '1', 'X-Instana-S': '1', 'X-Instana-L': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -70,7 +70,7 @@ def test_http_mixed_case_extract(): carrier = {'x-insTana-T': '1', 'X-inSTANa-S': '1', 'X-INstana-l': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -93,7 +93,7 @@ def test_http_128bit_headers(): 'X-Instana-S': '0000000000000000b0789916ff8f319f', 'X-Instana-L': '1'} ctx = ot.tracer.extract(ot.Format.HTTP_HEADERS, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('b0789916ff8f319f', ctx.trace_id) assert_equals('b0789916ff8f319f', ctx.span_id) @@ -146,7 +146,7 @@ def test_text_basic_extract(): carrier = {'X-INSTANA-T': '1', 'X-INSTANA-S': '1', 'X-INSTANA-L': '1'} ctx = ot.tracer.extract(ot.Format.TEXT_MAP, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('0000000000000001', ctx.trace_id) assert_equals('0000000000000001', ctx.span_id) @@ -179,6 +179,6 @@ def test_text_128bit_headers(): 'X-INSTANA-S': ' 0000000000000000b0789916ff8f319f', 'X-INSTANA-L': '1'} ctx = ot.tracer.extract(ot.Format.TEXT_MAP, carrier) - assert type(ctx) is basictracer.context.SpanContext + assert type(ctx) is span_context.InstanaSpanContext assert_equals('b0789916ff8f319f', ctx.trace_id) assert_equals('b0789916ff8f319f', ctx.span_id)