Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
version: '2'
services:
redis:
image: redis:4.0.6
image: 'bitnami/redis:latest'
environment:
- ALLOW_EMPTY_PASSWORD=yes
ports:
- 6379:6379

#
# Dev: Optionally enable to validate Redis Sentinel
#
# redis-sentinel:
# image: 'bitnami/redis-sentinel:latest'
# environment:
# - REDIS_MASTER_HOST=redis
# ports:
# - '26379:26379'

# Kafka test will sometimes fail because Zookeeper won't start due to
# java.io.IOException: Unable to create data directory /opt/zookeeper-3.4.9/data/version-2, which seems to be a known issue:
# -> https://issues.apache.org/jira/browse/ZOOKEEPER-1936
Expand Down Expand Up @@ -55,4 +67,4 @@ services:
image: rabbitmq:3.7.8-alpine
ports:
- 5671:5671
- 5672:5672
- 5672:5672
16 changes: 4 additions & 12 deletions instana/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .fsm import TheMachine
from .log import logger
from .sensor import Sensor
from .util import to_json


class From(object):
Expand Down Expand Up @@ -86,15 +87,6 @@ def reset(self):
# Will schedule a restart of the announce cycle in the future
self.machine.reset()

def to_json(self, o):
def extractor(o):
return {k.lower(): v for k, v in o.__dict__.items() if v is not None}

try:
return json.dumps(o, default=extractor, sort_keys=False, separators=(',', ':')).encode()
except Exception:
logger.debug("to_json", exc_info=True)

def is_timed_out(self):
if self.last_seen and self.can_send:
diff = datetime.now() - self.last_seen
Expand Down Expand Up @@ -165,7 +157,7 @@ def announce(self, discovery):
logger.debug("making announce request to %s", url)
response = None
response = self.client.put(url,
data=self.to_json(discovery),
data=to_json(discovery),
headers={"Content-Type": "application/json"},
timeout=0.8)

Expand Down Expand Up @@ -196,7 +188,7 @@ def report_data(self, entity_data):
try:
response = None
response = self.client.post(self.__data_url(),
data=self.to_json(entity_data),
data=to_json(entity_data),
headers={"Content-Type": "application/json"},
timeout=0.8)

Expand All @@ -221,7 +213,7 @@ def report_traces(self, spans):

response = None
response = self.client.post(self.__traces_url(),
data=self.to_json(spans),
data=to_json(spans),
headers={"Content-Type": "application/json"},
timeout=0.8)

Expand Down
67 changes: 37 additions & 30 deletions instana/instrumentation/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,44 @@

if ((redis.VERSION >= (2, 10, 6)) and (redis.VERSION < (3, 0, 0))):

def collect_tags(span, instance, args, kwargs):
try:
ckw = instance.connection_pool.connection_kwargs

span.set_tag("driver", "redis-py")

host = ckw.get('host', None)
port = ckw.get('port', '6379')
db = ckw.get('db', None)

if host is not None:
url = "redis://%s:%s" % (host, port)
if db is not None:
url = url + "/%s" % db
span.set_tag('connection', url)

except:
logger.debug("redis.collect_tags non-fatal error", exc_info=True)
finally:
return span

@wrapt.patch_function_wrapper('redis.client','StrictRedis.execute_command')
def execute_command_with_instana(wrapped, instance, args, kwargs):
parent_span = tracer.active_span

# If we're not tracing, just return
if parent_span is None:
if parent_span is None or parent_span.operation_name == "redis":
return wrapped(*args, **kwargs)

with tracer.start_active_span("redis", child_of=parent_span) as scope:

try:
ckw = instance.connection_pool.connection_kwargs
url = "redis://%s:%d/%d" % (ckw['host'], ckw['port'], ckw['db'])
scope.span.set_tag("connection", url)
scope.span.set_tag("driver", "redis-py")
scope.span.set_tag("command", args[0])
collect_tags(scope.span, instance, args, kwargs)
if (len(args) > 0):
scope.span.set_tag("command", args[0])

rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.set_tag("redis.error", str(e))
scope.span.set_tag("error", True)
ec = scope.span.tags.get('ec', 0)
scope.span.set_tag("ec", ec+1)
scope.span.log_exception(e)
raise
else:
return rv
Expand All @@ -42,34 +57,26 @@ def execute_with_instana(wrapped, instance, args, kwargs):
parent_span = tracer.active_span

# If we're not tracing, just return
if parent_span is None:
if parent_span is None or parent_span.operation_name == "redis":
return wrapped(*args, **kwargs)

with tracer.start_active_span("redis", child_of=parent_span) as scope:

try:
ckw = instance.connection_pool.connection_kwargs
url = "redis://%s:%d/%d" % (ckw['host'], ckw['port'], ckw['db'])
scope.span.set_tag("connection", url)
scope.span.set_tag("driver", "redis-py")
collect_tags(scope.span, instance, args, kwargs)
scope.span.set_tag("command", 'PIPELINE')

try:
pipe_cmds = []
for e in instance.command_stack:
pipe_cmds.append(e[0][0])
scope.span.set_tag("subCommands", pipe_cmds)
except Exception as e:
# If anything breaks during cmd collection, just log a
# debug message
logger.debug("Error collecting pipeline commands")
pipe_cmds = []
for e in instance.command_stack:
pipe_cmds.append(e[0][0])
scope.span.set_tag("subCommands", pipe_cmds)
except Exception as e:
# If anything breaks during K/V collection, just log a debug message
logger.debug("Error collecting pipeline commands", exc_info=True)

try:
rv = wrapped(*args, **kwargs)
except Exception as e:
scope.span.set_tag("redis.error", str(e))
scope.span.set_tag("error", True)
ec = scope.span.tags.get('ec', 0)
scope.span.set_tag("ec", ec+1)
scope.span.log_exception(e)
raise
else:
return rv
Expand Down
18 changes: 14 additions & 4 deletions instana/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,24 @@ def get_span_kind_as_int(self, span):
return kind

def collect_logs(self, span):
"""
Collect up log data and feed it to the Instana brain.

:param span: The span to search for logs in
:return: Logs ready for consumption by the Instana brain.
"""
logs = {}
for l in span.logs:
ts = int(round(l.timestamp * 1000))
for log in span.logs:
ts = int(round(log.timestamp * 1000))
if ts not in logs:
logs[ts] = {}

for f in l.key_values:
logs[ts][f] = l.key_values[f]
if 'message' in log.key_values:
logs[ts]['message'] = log.key_values['message']
if 'event' in log.key_values:
logs[ts]['event'] = log.key_values['event']
if 'parameters' in log.key_values:
logs[ts]['parameters'] = log.key_values['parameters']

return logs

Expand Down
12 changes: 9 additions & 3 deletions instana/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ def to_json(obj):
:return: json string
"""
try:
return json.dumps(obj, default=lambda obj: {k.lower(): v for k, v in obj.__dict__.items()},
sort_keys=False, separators=(',', ':')).encode()
def extractor(o):
if not hasattr(o, '__dict__'):
logger.debug("Couldn't serialize non dict type: %s", type(o))
return {}
else:
return {k.lower(): v for k, v in o.__dict__.items() if v is not None}

return json.dumps(obj, default=extractor, sort_keys=False, separators=(',', ':')).encode()
except Exception:
logger.debug("to_json: ", exc_info=True)
logger.debug("to_json non-fatal encoding issue: ", exc_info=True)


def package_version():
Expand Down
2 changes: 1 addition & 1 deletion instana/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def new_start_response(status, headers, exc_info=None):
self.scope.span.set_tag("http.%s" % custom_header, env[wsgi_header])

if 'PATH_INFO' in env:
self.scope.span.set_tag(tags.HTTP_URL, env['PATH_INFO'])
self.scope.span.set_tag('http.path', env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
self.scope.span.set_tag("http.params", scrubbed_params)
Expand Down
5 changes: 1 addition & 4 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
"""
Redis Environment
"""
if 'REDIS' in os.environ:
testenv['redis_url']= os.environ['REDIS']
else:
testenv['redis_url'] = '127.0.0.1:6379'
testenv['redis_host'] = os.environ.get('REDIS_HOST', '127.0.0.1')


def get_first_span_by_name(spans, name):
Expand Down
Loading