Skip to content

Commit

Permalink
Add support for anonymous tasks (#1407)
Browse files Browse the repository at this point in the history
  • Loading branch information
naormalca committed Apr 28, 2023
1 parent f46a6e1 commit 46e4b1d
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1730](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1730))
- Make ASGI request span attributes available for `start_span`.
([#1762](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1762))
- `opentelemetry-instrumentation-celery` Add support for anonymous tasks.
([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407)


### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs):
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
if task_id is None:
return

operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}"
if task is None:
# task is an anonymous task send using send_task or using canvas workflow
# Signatures() to send to a task not in the current processes dependency
# tree
task_name = kwargs.get("sender", "unknown")
else:
task_name = task.name
operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}"
span = self._tracer.start_span(
operation_name, kind=trace.SpanKind.PRODUCER
)
Expand All @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs):
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
span.set_attribute(_TASK_NAME_KEY, task.name)
span.set_attribute(_TASK_NAME_KEY, task_name)
utils.set_attributes_from_context(span, kwargs)

activation = trace.use_span(span, end_on_exit=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False):
NOTE: We cannot test for this well yet, because we do not run a celery worker,
and cannot run `task.apply_async()`
"""
if task is None:
return
span_dict = getattr(task, CTX_KEY, None)
if span_dict is None:
span_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def tearDown(self):
CeleryInstrumentor().uninstrument()
self._worker.stop()
self._thread.join()
CeleryInstrumentor().uninstrument()

def test_task(self):
CeleryInstrumentor().instrument()
Expand Down Expand Up @@ -97,3 +98,52 @@ def test_uninstrument(self):

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)


class TestCelerySignatureTask(TestBase):
def setUp(self):
super().setUp()

def start_app(*args, **kwargs):
# Add an additional task that will not be registered with parent thread
@app.task
def hidden_task(num_a):
return num_a * 2

self._worker = app.Worker(app=app, pool="solo", concurrency=1)
return self._worker.start(*args, **kwargs)

self._thread = threading.Thread(target=start_app)
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
self._thread.daemon = True
self._thread.start()

def tearDown(self):
super().tearDown()
self._worker.stop()
self._thread.join()
CeleryInstrumentor().uninstrument()

def test_hidden_task(self):
# no-op since already instrumented
CeleryInstrumentor().instrument()

res = app.signature("tests.test_tasks.hidden_task", (2,)).apply_async()
while not res.ready():
time.sleep(0.05)
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(consumer.name, "run/tests.test_tasks.hidden_task")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)

self.assertEqual(
producer.name, "apply_async/tests.test_tasks.hidden_task"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ def fn_task():
utils.detach_span(fn_task, task_id)
self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None))

def test_optional_task_span_attach(self):
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))

# assert this is is a no-aop
self.assertIsNone(utils.attach_span(None, task_id, span))

def test_span_delete_empty(self):
# ensure detach_span doesn't raise an exception if span is not present
@self.app.task
Expand Down

0 comments on commit 46e4b1d

Please sign in to comment.