Skip to content

Commit

Permalink
Merge branch 'main' into opentelemetrybot/update-version-to-1.21.0.de…
Browse files Browse the repository at this point in the history
…v-0.42b0.dev
  • Loading branch information
ocelotl committed Sep 4, 2023
2 parents e35ca42 + fb9eb32 commit a0c9bf2
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1889))
- Fixed union typing error not compatible with Python 3.7 introduced in `opentelemetry-util-http`, fix tests introduced by patch related to sanitize method for wsgi
([#1913](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1913))
- `opentelemetry-instrumentation-celery` Unwrap Celery's `ExceptionInfo` errors and report the actual exception that was raised. ([#1863](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1863))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add(x, y):
from timeit import default_timer
from typing import Collection, Iterable

from billiard.einfo import ExceptionInfo
from celery import signals # pylint: disable=no-name-in-module

from opentelemetry import trace
Expand All @@ -75,6 +76,13 @@ def add(x, y):
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
from billiard import VERSION


if VERSION >= (4, 0, 1):
from billiard.einfo import ExceptionWithTraceback
else:
ExceptionWithTraceback = None

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,6 +279,18 @@ def _trace_failure(*args, **kwargs):
return

if ex is not None:
# Unwrap the actual exception wrapped by billiard's
# `ExceptionInfo` and `ExceptionWithTraceback`.
if isinstance(ex, ExceptionInfo) and ex.exception is not None:
ex = ex.exception

if (
ExceptionWithTraceback is not None
and isinstance(ex, ExceptionWithTraceback)
and ex.exc is not None
):
ex = ex.exc

status_kwargs["description"] = str(ex)
span.record_exception(ex)
span.set_status(Status(**status_kwargs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from celery import registry # pylint: disable=no-name-in-module
from billiard import VERSION

from opentelemetry.semconv.trace import SpanAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ class Config:
app.config_from_object(Config)


class CustomError(Exception):
pass


@app.task
def task_add(num_a, num_b):
return num_a + num_b


@app.task
def task_raises():
raise CustomError("The task failed!")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind
from opentelemetry.trace import SpanKind, StatusCode

from .celery_test_tasks import app, task_add
from .celery_test_tasks import app, task_add, task_raises


class TestCeleryInstrumentation(TestBase):
Expand Down Expand Up @@ -66,6 +66,10 @@ def test_task(self):
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)

self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
Expand All @@ -84,6 +88,70 @@ def test_task(self):
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_task_raises(self):
CeleryInstrumentor().instrument()

result = task_raises.delay()

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(
consumer.name, "run/tests.celery_test_tasks.task_raises"
)
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "FAILURE",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_raises",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.ERROR)

self.assertEqual(1, len(consumer.events))
event = consumer.events[0]

self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError"
)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_MESSAGE],
"The task failed!",
)

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_raises"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_uninstrument(self):
CeleryInstrumentor().instrument()
CeleryInstrumentor().uninstrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _instrument(self, **kwargs):
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``enable_commenter``: bool to enable sqlcommenter, defaults to False
``commenter_options``: dict of sqlcommenter config, defaults to None
``commenter_options``: dict of sqlcommenter config, defaults to {}
Returns:
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def fn_exception():
assert len(span.events) == 1
event = span.events[0]
assert event.name == "exception"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "ExceptionInfo"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "Exception"
assert SpanAttributes.EXCEPTION_MESSAGE in event.attributes
assert (
span.attributes.get(SpanAttributes.MESSAGING_MESSAGE_ID)
Expand Down Expand Up @@ -420,7 +420,7 @@ def run(self):
assert "Task class is failing" in span.status.description


def test_class_task_exception_excepted(celery_app, memory_exporter):
def test_class_task_exception_expected(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
throws = (MyException,)

Expand Down

0 comments on commit a0c9bf2

Please sign in to comment.