diff --git a/instana/instrumentation/celery/hooks.py b/instana/instrumentation/celery/hooks.py index 71829ad1..188d0d1d 100644 --- a/instana/instrumentation/celery/hooks.py +++ b/instana/instrumentation/celery/hooks.py @@ -10,6 +10,21 @@ from .catalog import task_catalog_get, task_catalog_pop, task_catalog_push, get_task_id from celery.contrib import rdb + try: + from urllib import parse + except ImportError: + import urlparse as parse + import urllib + + def add_broker_tags(span, broker_url): + try: + url = parse.urlparse(broker_url) + span.set_tag("scheme", url.scheme) + span.set_tag("host", url.hostname) + span.set_tag("port", url.port) + except: + logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True) + @signals.task_prerun.connect def task_prerun(*args, **kwargs): try: @@ -24,7 +39,7 @@ def task_prerun(*args, **kwargs): scope = tracer.start_active_span("celery-worker", child_of=ctx) scope.span.set_tag("task", task.name) scope.span.set_tag("task_id", task_id) - scope.span.set_tag("broker", task.app.conf['broker_url']) + add_broker_tags(scope.span, task.app.conf['broker_url']) # Store the scope on the task to eventually close it out on the "after" signal task_catalog_push(task, task_id, scope, True) @@ -86,8 +101,8 @@ def before_task_publish(*args, **kwargs): scope = tracer.start_active_span("celery-client", child_of=parent_span) scope.span.set_tag("task", task_name) - scope.span.set_tag("broker", task.app.conf['broker_url']) scope.span.set_tag("task_id", task_id) + add_broker_tags(scope.span, task.app.conf['broker_url']) # Context propagation context_headers = {} diff --git a/instana/span.py b/instana/span.py index 8ad5abce..78330817 100644 --- a/instana/span.py +++ b/instana/span.py @@ -265,7 +265,9 @@ def _populate_entry_span_data(self, span): elif span.operation_name == "celery-worker": self.data["celery"]["task"] = span.tags.pop('task', None) self.data["celery"]["task_id"] = span.tags.pop('task_id', None) - self.data["celery"]["broker"] = span.tags.pop('broker', None) + self.data["celery"]["scheme"] = span.tags.pop('scheme', None) + self.data["celery"]["host"] = span.tags.pop('host', None) + self.data["celery"]["port"] = span.tags.pop('port', None) self.data["celery"]["retry-reason"] = span.tags.pop('retry-reason', None) self.data["celery"]["error"] = span.tags.pop('error', None) @@ -314,7 +316,9 @@ def _populate_exit_span_data(self, span): elif span.operation_name == "celery-client": self.data["celery"]["task"] = span.tags.pop('task', None) self.data["celery"]["task_id"] = span.tags.pop('task_id', None) - self.data["celery"]["broker"] = span.tags.pop('broker', None) + self.data["celery"]["scheme"] = span.tags.pop('scheme', None) + self.data["celery"]["host"] = span.tags.pop('host', None) + self.data["celery"]["port"] = span.tags.pop('port', None) self.data["celery"]["error"] = span.tags.pop('error', None) elif span.operation_name == "couchbase": diff --git a/tests/clients/test_couchbase.py b/tests/clients/test_couchbase.py index 99400ce6..f1ddefde 100644 --- a/tests/clients/test_couchbase.py +++ b/tests/clients/test_couchbase.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -import pytest +import time import unittest from instana.singletons import tracer @@ -28,16 +28,15 @@ class TestStandardCouchDB(unittest.TestCase): def setUp(self): """ Clear all spans before a test run """ self.recorder = tracer.recorder - self.recorder.clear_spans() self.cluster = Cluster('couchbase://%s' % testenv['couchdb_host']) self.bucket = Bucket('couchbase://%s/travel-sample' % testenv['couchdb_host'], username=testenv['couchdb_username'], password=testenv['couchdb_password']) # self.bucket = self.cluster.open_bucket('travel-sample') self.bucket.upsert('test-key', 1) + self.recorder.clear_spans() def tearDown(self): - """ Do nothing for now """ - return None + time.sleep(0.5) def test_vanilla_get(self): res = self.bucket.get("test-key") @@ -474,7 +473,6 @@ def test_prepend_multi(self): self.assertEqual(cb_span.data["couchbase"]["bucket"], 'travel-sample') self.assertEqual(cb_span.data["couchbase"]["type"], 'prepend_multi') - @pytest.mark.skip(reason="Failing test for unchanged instrumentation; todo") def test_get(self): res = None @@ -1080,7 +1078,6 @@ def test_ping(self): self.assertEqual(cb_span.data["couchbase"]["bucket"], 'travel-sample') self.assertEqual(cb_span.data["couchbase"]["type"], 'ping') - @pytest.mark.skip def test_diagnostics(self): res = None diff --git a/tests/frameworks/test_celery.py b/tests/frameworks/test_celery.py index ab33cfba..497a537c 100644 --- a/tests/frameworks/test_celery.py +++ b/tests/frameworks/test_celery.py @@ -49,13 +49,17 @@ def test_apply_async(celery_app, celery_worker): assert(client_span.p == test_span.s) assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis://localhost:6379" == client_span.data["celery"]["broker"]) + assert("redis" == client_span.data["celery"]["scheme"]) + assert("localhost" == client_span.data["celery"]["host"]) + assert(6379 == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis://localhost:6379" == worker_span.data["celery"]["broker"]) + assert("redis" == worker_span.data["celery"]["scheme"]) + assert("localhost" == worker_span.data["celery"]["host"]) + assert(6379 == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -90,13 +94,17 @@ def test_delay(celery_app, celery_worker): assert(client_span.p == test_span.s) assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis://localhost:6379" == client_span.data["celery"]["broker"]) + assert("redis" == client_span.data["celery"]["scheme"]) + assert("localhost" == client_span.data["celery"]["host"]) + assert(6379 == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis://localhost:6379" == worker_span.data["celery"]["broker"]) + assert("redis" == worker_span.data["celery"]["scheme"]) + assert("localhost" == worker_span.data["celery"]["host"]) + assert(6379 == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -131,13 +139,17 @@ def test_send_task(celery_app, celery_worker): assert(client_span.p == test_span.s) assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) - assert("redis://localhost:6379" == client_span.data["celery"]["broker"]) + assert("redis" == client_span.data["celery"]["scheme"]) + assert("localhost" == client_span.data["celery"]["host"]) + assert(6379 == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) - assert("redis://localhost:6379" == worker_span.data["celery"]["broker"]) + assert("redis" == worker_span.data["celery"]["scheme"]) + assert("localhost" == worker_span.data["celery"]["host"]) + assert(6379 == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -172,13 +184,17 @@ def test_error_reporting(celery_app, celery_worker): assert(client_span.p == test_span.s) assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"]) - assert("redis://localhost:6379" == client_span.data["celery"]["broker"]) + assert("redis" == client_span.data["celery"]["scheme"]) + assert("localhost" == client_span.data["celery"]["host"]) + assert(6379 == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"]) - assert("redis://localhost:6379" == worker_span.data["celery"]["broker"]) + assert("redis" == worker_span.data["celery"]["scheme"]) + assert("localhost" == worker_span.data["celery"]["host"]) + assert(6379 == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == 'This is a simulated error') assert(worker_span.data["celery"]["retry-reason"] == None)