Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export RABBITMQ_HOST="192.168.201.129"
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,9 @@ before_install:

install: "pip install -r requirements-test.txt"

sudo: required

script: nosetests -v
services:
- rabbitmq

script: python runtests.py
1 change: 1 addition & 0 deletions instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def load(module):
def load_instrumentation():
if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ:
# Import & initialize instrumentation
from .instrumentation import asynqp # noqa
from .instrumentation import urllib3 # noqa
from .instrumentation import sudsjurko # noqa
from .instrumentation import mysqlpython # noqa
Expand Down
2 changes: 1 addition & 1 deletion instana/http_propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def extract(self, carrier): # noqa
raise ot.SpanContextCorruptedException()

# Look for standard X-Instana-T/S format
if self.HEADER_KEY_T in dc and self.header_key_s in dc:
if self.HEADER_KEY_T in dc and self.HEADER_KEY_S in dc:
trace_id = header_to_id(dc[self.HEADER_KEY_T])
span_id = header_to_id(dc[self.HEADER_KEY_S])

Expand Down
97 changes: 97 additions & 0 deletions instana/instrumentation/asynqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import absolute_import

import opentracing
import opentracing.ext.tags as ext
import wrapt

from ..log import logger
from ..singletons import tracer

try:
import asyncio
import asynqp

@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
def publish_with_instana(wrapped, instance, args, kwargs):
parent_span = tracer.active_span

# If we're not tracing, just return
if parent_span is None:
return wrapped(*args, **kwargs)

with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
host, port = instance.sender.protocol.transport._sock.getsockname()

msg = args[0]
if msg.headers is None:
msg.headers = {}
tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)

try:
scope.span.set_tag("exchange", instance.name)
scope.span.set_tag("sort", "publish")
scope.span.set_tag("address", host + ":" + str(port) )
scope.span.set_tag("key", args[1])

rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_kv({'message': e})
scope.span.set_tag("error", True)
ec = scope.span.tags.get('ec', 0)
scope.span.set_tag("ec", ec+1)
raise
else:
return rv

@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
def get_with_instana(wrapped, instance, args, kwargs):
parent_span = tracer.active_span

with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
host, port = instance.sender.protocol.transport._sock.getsockname()

try:
scope.span.set_tag("queue", instance.name)
scope.span.set_tag("sort", "consume")
scope.span.set_tag("address", host + ":" + str(port) )

rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_kv({'message': e})
scope.span.set_tag("error", True)
ec = scope.span.tags.get('ec', 0)
scope.span.set_tag("ec", ec+1)
raise
else:
return rv

@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
def deliver_with_instana(wrapped, instance, args, kwargs):

ctx = None
msg = args[1]
if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers:
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))

with tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
host, port = args[1].sender.protocol.transport._sock.getsockname()

try:
scope.span.set_tag("exchange", msg.exchange_name)
scope.span.set_tag("sort", "consume")
scope.span.set_tag("address", host + ":" + str(port) )
scope.span.set_tag("key", msg.routing_key)

rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.log_kv({'message': e})
scope.span.set_tag("error", True)
ec = scope.span.tags.get('ec', 0)
scope.span.set_tag("ec", ec+1)
raise
else:
return rv

logger.debug("Instrumenting asynqp")
except ImportError:
pass
38 changes: 25 additions & 13 deletions instana/json_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,62 @@ def __init__(self, **kwds):
self.__dict__[key] = kwds[key]


class CustomData(object):
tags = None
logs = None

def __init__(self, **kwds):
self.__dict__.update(kwds)


class Data(object):
service = None
http = None
baggage = None
custom = None
sdk = None
soap = None
rabbitmq = None

def __init__(self, **kwds):
self.__dict__.update(kwds)


class MySQLData(object):
db = None
class HttpData(object):
host = None
user = None
stmt = None
url = None
status = 0
method = None
error = None

def __init__(self, **kwds):
self.__dict__.update(kwds)


class HttpData(object):
class MySQLData(object):
db = None
host = None
url = None
status = 0
method = None
user = None
stmt = None
error = None

def __init__(self, **kwds):
self.__dict__.update(kwds)


class SoapData(object):
action = None
class RabbitmqData(object):
exchange = None
queue = None
sort = None
address = None
key = None

def __init__(self, **kwds):
self.__dict__.update(kwds)


class CustomData(object):
tags = None
logs = None
class SoapData(object):
action = None

def __init__(self, **kwds):
self.__dict__.update(kwds)
Expand Down
32 changes: 24 additions & 8 deletions instana/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import instana.singletons

from .json_span import (CustomData, Data, HttpData, JsonSpan, MySQLData,
SDKData, SoapData)
RabbitmqData, SDKData, SoapData)
from .log import logger

if sys.version_info.major is 2:
Expand All @@ -22,12 +22,12 @@


class InstanaRecorder(SpanRecorder):
registered_spans = ("django", "memcache", "mysql", "rpc-client",
registered_spans = ("django", "memcache", "mysql", "rabbitmq", "rpc-client",
"rpc-server", "soap", "urllib3", "wsgi")
http_spans = ("django", "wsgi", "urllib3", "soap")

exit_spans = ("memcache", "mysql", "rpc-client", "soap", "urllib3")
entry_spans = ("django", "wsgi", "rpc-server")
exit_spans = ("memcache", "mysql", "rabbitmq", "rpc-client", "soap", "urllib3")
entry_spans = ("django", "wsgi", "rabbitmq", "rpc-server")

entry_kind = ["entry", "server", "consumer"]
exit_kind = ["exit", "client", "producer"]
Expand Down Expand Up @@ -91,9 +91,13 @@ def record_span(self, span):

def build_registered_span(self, span):
""" Takes a BasicSpan and converts it into a registered JsonSpan """
data = Data(baggage=span.context.baggage,
custom=CustomData(tags=span.tags,
logs=self.collect_logs(span)))
data = Data(baggage=span.context.baggage)

logs = self.collect_logs(span)
if len(logs) > 0:
if data.custom is None:
data.custom = CustomData()
data.custom.logs = logs

if span.operation_name in self.http_spans:
data.http = HttpData(host=self.get_http_host_name(span),
Expand All @@ -102,6 +106,13 @@ def build_registered_span(self, span):
status=span.tags.pop(ext.HTTP_STATUS_CODE, None),
error=span.tags.pop('http.error', None))

if span.operation_name == "rabbitmq":
data.rabbitmq = RabbitmqData(exchange=span.tags.pop('exchange', None),
queue=span.tags.pop('queue', None),
sort=span.tags.pop('sort', None),
address=span.tags.pop('address', None),
key=span.tags.pop('key', None))

if span.operation_name == "soap":
data.soap = SoapData(action=span.tags.pop('soap.action', None))

Expand All @@ -110,10 +121,15 @@ def build_registered_span(self, span):
db=span.tags.pop(ext.DATABASE_INSTANCE, None),
user=span.tags.pop(ext.DATABASE_USER, None),
stmt=span.tags.pop(ext.DATABASE_STATEMENT, None))
if len(data.custom.logs.keys()):
if (data.custom is not None) and (data.custom.logs is not None) and len(data.custom.logs):
tskey = list(data.custom.logs.keys())[0]
data.mysql.error = data.custom.logs[tskey]['message']

if len(span.tags) > 0:
if data.custom is None:
data.custom = CustomData()
data.custom.tags = span.tags

entityFrom = {'e': instana.singletons.agent.from_.pid,
'h': instana.singletons.agent.from_.agentUuid}

Expand Down
8 changes: 4 additions & 4 deletions instana/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ def start_span(self,
tags=tags,
start_time=start_time)

if operation_name in self.recorder.entry_spans:
# For entry spans, add only a backtrace fingerprint
self.__add_stack(span, limit=2)

if operation_name in self.recorder.exit_spans:
self.__add_stack(span)

elif operation_name in self.recorder.entry_spans:
# For entry spans, add only a backtrace fingerprint
self.__add_stack(span, limit=2)

return span

def inject(self, span_context, format, carrier):
Expand Down
10 changes: 10 additions & 0 deletions runtests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
import nose
from distutils.version import LooseVersion

args = ['nosetests', '-v']

if (LooseVersion(sys.version) <= LooseVersion('3.5')):
args.extend(['-e', 'asynqp'])

result = nose.run(argv=args)
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ def check_setuptools():
},
extras_require={
'test': [
'asynqp>=0.4',
'django>=1.11',
'nose>=1.0',
'flask>=0.12.2',
'lxml>=3.4',
'mock>=2.0.0',
'MySQL-python>=1.2.5;python_version<="2.7"',
'pyOpenSSL>=16.1.0;python_version<="2.7"',
'pytest>=3.0.1',
'requests>=2.17.1',
'urllib3[secure]>=1.15',
'spyne>=2.9',
Expand Down
Loading