From 74cfdeb1cf0d498fb23d1bb55c0a7b77cbc772e8 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Thu, 13 Feb 2025 17:12:34 +0530 Subject: [PATCH 1/4] fix: add messaging.system for celery Signed-off-by: Shivanshu Raj Shrivastava --- .../src/opentelemetry/instrumentation/celery/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 908f158507..f37f6a9c0b 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -96,6 +96,8 @@ def add(x, y): _TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal" _TASK_NAME_KEY = "celery.task_name" +_QUEUE_NAME = "queue" + class CeleryGetter(Getter): def get(self, carrier, key): @@ -204,6 +206,7 @@ def _trace_postrun(self, *args, **kwargs): # request context tags if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, _QUEUE_NAME) utils.set_attributes_from_context(span, kwargs) utils.set_attributes_from_context(span, task.request) span.set_attribute(_TASK_NAME_KEY, task.name) @@ -241,6 +244,7 @@ def _trace_before_publish(self, *args, **kwargs): if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, _QUEUE_NAME) span.set_attribute(_TASK_NAME_KEY, task_name) utils.set_attributes_from_context(span, kwargs) From ad7a73bf35e218492a007b1e688682e9d2acb832 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Thu, 13 Feb 2025 17:23:40 +0530 Subject: [PATCH 2/4] fix: add changelog entry Signed-off-by: Shivanshu Raj Shrivastava --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 101cafd361..7ed3a582ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `opentelemetry-instrumentation-celery` Add messaging.system for celery + ([#3265](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3265)) + - `opentelemetry-instrumentation-botocore` Add support for GenAI user events and lazy initialize tracer ([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258)) - `opentelemetry-instrumentation-botocore` Add support for GenAI system events From 8e134823e25ea977af8a85dbaa2b069771dacd2e Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 19 Feb 2025 23:51:41 +0530 Subject: [PATCH 3/4] test: add unit tests Signed-off-by: Shivanshu Raj Shrivastava --- .../instrumentation/celery/__init__.py | 2 +- .../tests/test_tasks.py | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index f37f6a9c0b..f23fc9ca4c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -96,7 +96,7 @@ def add(x, y): _TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal" _TASK_NAME_KEY = "celery.task_name" -_QUEUE_NAME = "queue" +_QUEUE_NAME = "celery" class CeleryGetter(Getter): diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index c68b1bc758..e8ed5c0ea3 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -92,6 +92,42 @@ def test_task(self): self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + def test_queue_name(self): + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + SpanAttributes.MESSAGING_SYSTEM: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + SpanAttributes.MESSAGING_SYSTEM: "celery", + }, + ) + def test_task_raises(self): CeleryInstrumentor().instrument() From 437648e6680b948926ee78157e01f0ceb73013a8 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Thu, 20 Feb 2025 15:40:41 +0530 Subject: [PATCH 4/4] chore: review comments Signed-off-by: Shivanshu Raj Shrivastava --- .../tests/test_tasks.py | 40 ++----------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index e8ed5c0ea3..a7a2291833 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -67,6 +67,7 @@ def test_task(self): "celery.state": "SUCCESS", SpanAttributes.MESSAGING_DESTINATION: "celery", "celery.task_name": "tests.celery_test_tasks.task_add", + SpanAttributes.MESSAGING_SYSTEM: "celery", }, ) @@ -85,6 +86,7 @@ def test_task(self): "celery.task_name": "tests.celery_test_tasks.task_add", SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", SpanAttributes.MESSAGING_DESTINATION: "celery", + SpanAttributes.MESSAGING_SYSTEM: "celery", }, ) @@ -92,42 +94,6 @@ def test_task(self): self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) - def test_queue_name(self): - CeleryInstrumentor().instrument() - - result = task_add.delay(1, 2) - - timeout = time.time() + 60 * 1 # 1 minutes from now - while not result.ready(): - if time.time() > timeout: - break - time.sleep(0.05) - - spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) - self.assertEqual(len(spans), 2) - - consumer, producer = spans - - self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") - self.assertEqual(consumer.kind, SpanKind.CONSUMER) - self.assertSpanHasAttributes( - consumer, - { - "celery.action": "run", - "celery.state": "SUCCESS", - SpanAttributes.MESSAGING_SYSTEM: "celery", - "celery.task_name": "tests.celery_test_tasks.task_add", - }, - ) - self.assertSpanHasAttributes( - producer, - { - "celery.action": "apply_async", - "celery.task_name": "tests.celery_test_tasks.task_add", - SpanAttributes.MESSAGING_SYSTEM: "celery", - }, - ) - def test_task_raises(self): CeleryInstrumentor().instrument() @@ -155,6 +121,7 @@ def test_task_raises(self): "celery.state": "FAILURE", SpanAttributes.MESSAGING_DESTINATION: "celery", "celery.task_name": "tests.celery_test_tasks.task_raises", + SpanAttributes.MESSAGING_SYSTEM: "celery", }, ) @@ -186,6 +153,7 @@ def test_task_raises(self): "celery.task_name": "tests.celery_test_tasks.task_raises", SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", SpanAttributes.MESSAGING_DESTINATION: "celery", + SpanAttributes.MESSAGING_SYSTEM: "celery", }, )