diff --git a/docker-compose.yml b/docker-compose.yml index d7c63d2f..40fc12d3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,22 @@ version: '2' services: redis: - image: redis:4.0.6 + image: 'bitnami/redis:latest' + environment: + - ALLOW_EMPTY_PASSWORD=yes ports: - 6379:6379 +# +# Dev: Optionally enable to validate Redis Sentinel +# +# redis-sentinel: +# image: 'bitnami/redis-sentinel:latest' +# environment: +# - REDIS_MASTER_HOST=redis +# ports: +# - '26379:26379' + # Kafka test will sometimes fail because Zookeeper won't start due to # java.io.IOException: Unable to create data directory /opt/zookeeper-3.4.9/data/version-2, which seems to be a known issue: # -> https://issues.apache.org/jira/browse/ZOOKEEPER-1936 @@ -55,4 +67,4 @@ services: image: rabbitmq:3.7.8-alpine ports: - 5671:5671 - - 5672:5672 \ No newline at end of file + - 5672:5672 diff --git a/instana/agent.py b/instana/agent.py index 1341baae..29965079 100644 --- a/instana/agent.py +++ b/instana/agent.py @@ -14,6 +14,7 @@ from .fsm import TheMachine from .log import logger from .sensor import Sensor +from .util import to_json class From(object): @@ -86,15 +87,6 @@ def reset(self): # Will schedule a restart of the announce cycle in the future self.machine.reset() - def to_json(self, o): - def extractor(o): - return {k.lower(): v for k, v in o.__dict__.items() if v is not None} - - try: - return json.dumps(o, default=extractor, sort_keys=False, separators=(',', ':')).encode() - except Exception: - logger.debug("to_json", exc_info=True) - def is_timed_out(self): if self.last_seen and self.can_send: diff = datetime.now() - self.last_seen @@ -165,7 +157,7 @@ def announce(self, discovery): logger.debug("making announce request to %s", url) response = None response = self.client.put(url, - data=self.to_json(discovery), + data=to_json(discovery), headers={"Content-Type": "application/json"}, timeout=0.8) @@ -196,7 +188,7 @@ def report_data(self, entity_data): try: response = None response = self.client.post(self.__data_url(), - data=self.to_json(entity_data), + data=to_json(entity_data), headers={"Content-Type": "application/json"}, timeout=0.8) @@ -221,7 +213,7 @@ def report_traces(self, spans): response = None response = self.client.post(self.__traces_url(), - data=self.to_json(spans), + data=to_json(spans), headers={"Content-Type": "application/json"}, timeout=0.8) diff --git a/instana/instrumentation/redis.py b/instana/instrumentation/redis.py index 8c95ec55..5519cc46 100644 --- a/instana/instrumentation/redis.py +++ b/instana/instrumentation/redis.py @@ -10,29 +10,44 @@ if ((redis.VERSION >= (2, 10, 6)) and (redis.VERSION < (3, 0, 0))): + def collect_tags(span, instance, args, kwargs): + try: + ckw = instance.connection_pool.connection_kwargs + + span.set_tag("driver", "redis-py") + + host = ckw.get('host', None) + port = ckw.get('port', '6379') + db = ckw.get('db', None) + + if host is not None: + url = "redis://%s:%s" % (host, port) + if db is not None: + url = url + "/%s" % db + span.set_tag('connection', url) + + except: + logger.debug("redis.collect_tags non-fatal error", exc_info=True) + finally: + return span + @wrapt.patch_function_wrapper('redis.client','StrictRedis.execute_command') def execute_command_with_instana(wrapped, instance, args, kwargs): parent_span = tracer.active_span # If we're not tracing, just return - if parent_span is None: + if parent_span is None or parent_span.operation_name == "redis": return wrapped(*args, **kwargs) with tracer.start_active_span("redis", child_of=parent_span) as scope: - try: - ckw = instance.connection_pool.connection_kwargs - url = "redis://%s:%d/%d" % (ckw['host'], ckw['port'], ckw['db']) - scope.span.set_tag("connection", url) - scope.span.set_tag("driver", "redis-py") - scope.span.set_tag("command", args[0]) + collect_tags(scope.span, instance, args, kwargs) + if (len(args) > 0): + scope.span.set_tag("command", args[0]) rv = wrapped(*args, **kwargs) except Exception as e: - scope.span.set_tag("redis.error", str(e)) - scope.span.set_tag("error", True) - ec = scope.span.tags.get('ec', 0) - scope.span.set_tag("ec", ec+1) + scope.span.log_exception(e) raise else: return rv @@ -42,34 +57,26 @@ def execute_with_instana(wrapped, instance, args, kwargs): parent_span = tracer.active_span # If we're not tracing, just return - if parent_span is None: + if parent_span is None or parent_span.operation_name == "redis": return wrapped(*args, **kwargs) with tracer.start_active_span("redis", child_of=parent_span) as scope: - try: - ckw = instance.connection_pool.connection_kwargs - url = "redis://%s:%d/%d" % (ckw['host'], ckw['port'], ckw['db']) - scope.span.set_tag("connection", url) - scope.span.set_tag("driver", "redis-py") + collect_tags(scope.span, instance, args, kwargs) scope.span.set_tag("command", 'PIPELINE') - try: - pipe_cmds = [] - for e in instance.command_stack: - pipe_cmds.append(e[0][0]) - scope.span.set_tag("subCommands", pipe_cmds) - except Exception as e: - # If anything breaks during cmd collection, just log a - # debug message - logger.debug("Error collecting pipeline commands") + pipe_cmds = [] + for e in instance.command_stack: + pipe_cmds.append(e[0][0]) + scope.span.set_tag("subCommands", pipe_cmds) + except Exception as e: + # If anything breaks during K/V collection, just log a debug message + logger.debug("Error collecting pipeline commands", exc_info=True) + try: rv = wrapped(*args, **kwargs) except Exception as e: - scope.span.set_tag("redis.error", str(e)) - scope.span.set_tag("error", True) - ec = scope.span.tags.get('ec', 0) - scope.span.set_tag("ec", ec+1) + scope.span.log_exception(e) raise else: return rv diff --git a/instana/recorder.py b/instana/recorder.py index d532532a..8c4471e0 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -328,14 +328,24 @@ def get_span_kind_as_int(self, span): return kind def collect_logs(self, span): + """ + Collect up log data and feed it to the Instana brain. + + :param span: The span to search for logs in + :return: Logs ready for consumption by the Instana brain. + """ logs = {} - for l in span.logs: - ts = int(round(l.timestamp * 1000)) + for log in span.logs: + ts = int(round(log.timestamp * 1000)) if ts not in logs: logs[ts] = {} - for f in l.key_values: - logs[ts][f] = l.key_values[f] + if 'message' in log.key_values: + logs[ts]['message'] = log.key_values['message'] + if 'event' in log.key_values: + logs[ts]['event'] = log.key_values['event'] + if 'parameters' in log.key_values: + logs[ts]['parameters'] = log.key_values['parameters'] return logs diff --git a/instana/util.py b/instana/util.py index f205b5bb..5868d5e9 100644 --- a/instana/util.py +++ b/instana/util.py @@ -81,10 +81,16 @@ def to_json(obj): :return: json string """ try: - return json.dumps(obj, default=lambda obj: {k.lower(): v for k, v in obj.__dict__.items()}, - sort_keys=False, separators=(',', ':')).encode() + def extractor(o): + if not hasattr(o, '__dict__'): + logger.debug("Couldn't serialize non dict type: %s", type(o)) + return {} + else: + return {k.lower(): v for k, v in o.__dict__.items() if v is not None} + + return json.dumps(obj, default=extractor, sort_keys=False, separators=(',', ':')).encode() except Exception: - logger.debug("to_json: ", exc_info=True) + logger.debug("to_json non-fatal encoding issue: ", exc_info=True) def package_version(): diff --git a/instana/wsgi.py b/instana/wsgi.py index 23a1c618..d9a939a7 100644 --- a/instana/wsgi.py +++ b/instana/wsgi.py @@ -45,7 +45,7 @@ def new_start_response(status, headers, exc_info=None): self.scope.span.set_tag("http.%s" % custom_header, env[wsgi_header]) if 'PATH_INFO' in env: - self.scope.span.set_tag(tags.HTTP_URL, env['PATH_INFO']) + self.scope.span.set_tag('http.path', env['PATH_INFO']) if 'QUERY_STRING' in env and len(env['QUERY_STRING']): scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list) self.scope.span.set_tag("http.params", scrubbed_params) diff --git a/tests/helpers.py b/tests/helpers.py index 5d64a58e..389fab73 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -31,10 +31,7 @@ """ Redis Environment """ -if 'REDIS' in os.environ: - testenv['redis_url']= os.environ['REDIS'] -else: - testenv['redis_url'] = '127.0.0.1:6379' +testenv['redis_host'] = os.environ.get('REDIS_HOST', '127.0.0.1') def get_first_span_by_name(spans, name): diff --git a/tests/test_redis.py b/tests/test_redis.py index 442f70eb..746e9283 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -1,10 +1,9 @@ from __future__ import absolute_import -import os -import sys import unittest import redis +from redis.sentinel import Sentinel from .helpers import testenv from instana.singletons import tracer @@ -15,18 +14,26 @@ def setUp(self): """ Clear all spans before a test run """ self.recorder = tracer.recorder self.recorder.clear_spans() - self.strict_redis = redis.StrictRedis.from_url("redis://%s/0" % testenv['redis_url']) - self.redis = redis.Redis.from_url("redis://%s/0" % testenv['redis_url']) + + # self.sentinel = Sentinel([(testenv['redis_host'], 26379)], socket_timeout=0.1) + # self.sentinel_master = self.sentinel.discover_master('mymaster') + # self.client = redis.Redis(host=self.sentinel_master[0]) + + self.client = redis.Redis(host=testenv['redis_host']) def tearDown(self): pass + def test_vanilla(self): + self.client.set('instrument', 'piano') + result = self.client.get('instrument') + def test_set_get(self): result = None with tracer.start_active_span('test'): - self.strict_redis.set('foox', 'barX') - self.strict_redis.set('fooy', 'barY') - result = self.strict_redis.get('foox') + self.client.set('foox', 'barX') + self.client.set('fooy', 'barY') + result = self.client.get('foox') spans = self.recorder.queued_spans() self.assertEqual(4, len(spans)) @@ -66,7 +73,7 @@ def test_set_get(self): self.assertTrue('redis' in rs1_span.data.__dict__) self.assertEqual('redis-py', rs1_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs1_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs1_span.data.redis.connection) self.assertEqual("SET", rs1_span.data.redis.command) self.assertIsNone(rs1_span.data.redis.error) @@ -80,7 +87,7 @@ def test_set_get(self): self.assertTrue('redis' in rs2_span.data.__dict__) self.assertEqual('redis-py', rs2_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs2_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs2_span.data.redis.connection) self.assertEqual("SET", rs2_span.data.redis.command) self.assertIsNone(rs2_span.data.redis.error) @@ -94,7 +101,7 @@ def test_set_get(self): self.assertTrue('redis' in rs3_span.data.__dict__) self.assertEqual('redis-py', rs3_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs3_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs3_span.data.redis.connection) self.assertEqual("GET", rs3_span.data.redis.command) self.assertIsNone(rs3_span.data.redis.error) @@ -105,9 +112,9 @@ def test_set_get(self): def test_set_incr_get(self): result = None with tracer.start_active_span('test'): - self.strict_redis.set('counter', '10') - self.strict_redis.incr('counter') - result = self.strict_redis.get('counter') + self.client.set('counter', '10') + self.client.incr('counter') + result = self.client.get('counter') spans = self.recorder.queued_spans() self.assertEqual(4, len(spans)) @@ -147,7 +154,7 @@ def test_set_incr_get(self): self.assertTrue('redis' in rs1_span.data.__dict__) self.assertEqual('redis-py', rs1_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs1_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs1_span.data.redis.connection) self.assertEqual("SET", rs1_span.data.redis.command) self.assertIsNone(rs1_span.data.redis.error) @@ -161,7 +168,7 @@ def test_set_incr_get(self): self.assertTrue('redis' in rs2_span.data.__dict__) self.assertEqual('redis-py', rs2_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs2_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs2_span.data.redis.connection) self.assertEqual("INCRBY", rs2_span.data.redis.command) self.assertIsNone(rs2_span.data.redis.error) @@ -175,7 +182,7 @@ def test_set_incr_get(self): self.assertTrue('redis' in rs3_span.data.__dict__) self.assertEqual('redis-py', rs3_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs3_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs3_span.data.redis.connection) self.assertEqual("GET", rs3_span.data.redis.command) self.assertIsNone(rs3_span.data.redis.error) @@ -186,9 +193,9 @@ def test_set_incr_get(self): def test_old_redis_client(self): result = None with tracer.start_active_span('test'): - self.redis.set('foox', 'barX') - self.redis.set('fooy', 'barY') - result = self.redis.get('foox') + self.client.set('foox', 'barX') + self.client.set('fooy', 'barY') + result = self.client.get('foox') spans = self.recorder.queued_spans() self.assertEqual(4, len(spans)) @@ -228,7 +235,7 @@ def test_old_redis_client(self): self.assertTrue('redis' in rs1_span.data.__dict__) self.assertEqual('redis-py', rs1_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs1_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs1_span.data.redis.connection) self.assertEqual("SET", rs1_span.data.redis.command) self.assertIsNone(rs1_span.data.redis.error) @@ -242,7 +249,7 @@ def test_old_redis_client(self): self.assertTrue('redis' in rs2_span.data.__dict__) self.assertEqual('redis-py', rs2_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs2_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs2_span.data.redis.connection) self.assertEqual("SET", rs2_span.data.redis.command) self.assertIsNone(rs2_span.data.redis.error) @@ -256,7 +263,7 @@ def test_old_redis_client(self): self.assertTrue('redis' in rs3_span.data.__dict__) self.assertEqual('redis-py', rs3_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs3_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs3_span.data.redis.connection) self.assertEqual("GET", rs3_span.data.redis.command) self.assertIsNone(rs3_span.data.redis.error) @@ -267,7 +274,7 @@ def test_old_redis_client(self): def test_pipelined_requests(self): result = None with tracer.start_active_span('test'): - pipe = self.strict_redis.pipeline() + pipe = self.client.pipeline() pipe.set('foox', 'barX') pipe.set('fooy', 'barY') pipe.get('foox') @@ -301,7 +308,7 @@ def test_pipelined_requests(self): self.assertTrue('redis' in rs1_span.data.__dict__) self.assertEqual('redis-py', rs1_span.data.redis.driver) - self.assertEqual("redis://%s/0" % testenv['redis_url'], rs1_span.data.redis.connection) + self.assertEqual("redis://%s:6379/0" % testenv['redis_host'], rs1_span.data.redis.connection) self.assertEqual("PIPELINE", rs1_span.data.redis.command) self.assertEqual(['SET', 'SET', 'GET'], rs1_span.data.redis.subCommands) self.assertIsNone(rs1_span.data.redis.error)