Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions instana/instrumentation/celery/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,44 @@ def add_broker_tags(span, broker_url):
try:
url = parse.urlparse(broker_url)
span.set_tag("scheme", url.scheme)
span.set_tag("host", url.hostname)
span.set_tag("port", url.port)

if url.hostname is None:
span.set_tag("host", 'localhost')
else:
span.set_tag("host", url.hostname)

if url.port is None:
# Set default port if not specified
if url.scheme == 'redis':
span.set_tag("port", "6379")
elif 'amqp' in url.scheme:
span.set_tag("port", "5672")
elif 'sqs' in url.scheme:
span.set_tag("port", "443")
else:
span.set_tag("port", str(url.port))
except:
logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True)

@signals.task_prerun.connect
def task_prerun(*args, **kwargs):
try:
ctx = None
task = kwargs.get('sender', None)
task_id = kwargs.get('task_id', None)
task = registry.tasks.get(task.name)

headers = task.request.get('headers', {})
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
if headers is not None:
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)

if ctx is not None:
scope = tracer.start_active_span("celery-worker", child_of=ctx)
scope.span.set_tag("task", task.name)
scope.span.set_tag("task_id", task_id)
add_broker_tags(scope.span, task.app.conf['broker_url'])
scope = tracer.start_active_span("celery-worker", child_of=ctx)
scope.span.set_tag("task", task.name)
scope.span.set_tag("task_id", task_id)
add_broker_tags(scope.span, task.app.conf['broker_url'])

# Store the scope on the task to eventually close it out on the "after" signal
task_catalog_push(task, task_id, scope, True)
# Store the scope on the task to eventually close it out on the "after" signal
task_catalog_push(task, task_id, scope, True)
except:
logger.debug("task_prerun: ", exc_info=True)

Expand Down
17 changes: 11 additions & 6 deletions instana/instrumentation/logging.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import absolute_import

import sys
import wrapt
import logging
import sys
import collections

from ..log import logger
from ..singletons import tracer
Expand All @@ -18,10 +19,14 @@ def log_with_instana(wrapped, instance, argv, kwargs):

# Only needed if we're tracing and serious log
if parent_span and argv[0] >= logging.WARN:

msg = str(argv[1])
args = argv[2]
if args and len(args) == 1 and isinstance(args[0], collections.Mapping) and args[0]:
args = args[0]

# get the formatted log message
# clients such as suds-jurko log things such as: Fault(Server: 'Server side fault example.')
# So make sure we're working with a string
msg = str(argv[1]) % argv[2]
msg = msg % args

# get additional information if an exception is being handled
parameters = None
Expand All @@ -37,8 +42,8 @@ def log_with_instana(wrapped, instance, argv, kwargs):
# extra tags for an error
if argv[0] >= logging.ERROR:
scope.span.mark_as_errored()
except Exception as e:
logger.debug('Exception: %s', e, exc_info=True)
except Exception:
logger.debug('log_with_instana:', exc_info=True)
finally:
return wrapped(*argv, **kwargs)

Expand Down
43 changes: 30 additions & 13 deletions tests/frameworks/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def will_raise_error():
raise Exception('This is a simulated error')


def filter_out_ping_tasks(spans):
filtered_spans = []
for span in spans:
is_ping_task = (span.n == 'celery-worker' and span.data['celery']['task'] == 'celery.ping')
if not is_ping_task:
filtered_spans.append(span)
return filtered_spans


def setup_method():
""" Clear all spans before a test run """
tracer.recorder.clear_spans()
Expand All @@ -29,7 +38,7 @@ def test_apply_async(celery_app, celery_worker):
# Wait for jobs to finish
time.sleep(0.5)

spans = tracer.recorder.queued_spans()
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
assert len(spans) == 3

filter = lambda span: span.n == "sdk"
Expand All @@ -51,15 +60,15 @@ def test_apply_async(celery_app, celery_worker):
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert("6379" == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert("6379" == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand All @@ -74,7 +83,7 @@ def test_delay(celery_app, celery_worker):
# Wait for jobs to finish
time.sleep(0.5)

spans = tracer.recorder.queued_spans()
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
assert len(spans) == 3

filter = lambda span: span.n == "sdk"
Expand All @@ -96,15 +105,15 @@ def test_delay(celery_app, celery_worker):
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert("6379" == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert("6379" == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand All @@ -119,7 +128,7 @@ def test_send_task(celery_app, celery_worker):
# Wait for jobs to finish
time.sleep(0.5)

spans = tracer.recorder.queued_spans()
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
assert len(spans) == 3

filter = lambda span: span.n == "sdk"
Expand All @@ -141,15 +150,15 @@ def test_send_task(celery_app, celery_worker):
assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert("6379" == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert("6379" == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand All @@ -164,8 +173,8 @@ def test_error_reporting(celery_app, celery_worker):
# Wait for jobs to finish
time.sleep(0.5)

spans = tracer.recorder.queued_spans()
assert len(spans) == 3
spans = filter_out_ping_tasks(tracer.recorder.queued_spans())
assert len(spans) == 4

filter = lambda span: span.n == "sdk"
test_span = get_first_span_by_filter(spans, filter)
Expand All @@ -175,26 +184,34 @@ def test_error_reporting(celery_app, celery_worker):
client_span = get_first_span_by_filter(spans, filter)
assert(client_span)

filter = lambda span: span.n == "log"
log_span = get_first_span_by_filter(spans, filter)
assert(log_span)

filter = lambda span: span.n == "celery-worker"
worker_span = get_first_span_by_filter(spans, filter)
assert(worker_span)

assert(client_span.t == test_span.t)
assert(client_span.t == worker_span.t)
assert(client_span.t == log_span.t)

assert(client_span.p == test_span.s)
assert(worker_span.p == client_span.s)
assert(log_span.p == worker_span.s)

assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert("6379" == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert("6379" == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == 'This is a simulated error')
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand Down