Skip to content

Commit

Permalink
aio-pika instrumentation: Removed check for non-sampled span when inj…
Browse files Browse the repository at this point in the history
…ect message headers. Reason to change is that sampled flag can be propagate https://www.w3.org/TR/trace-context/#sampled-flag and be useful when trace is not sampled.
  • Loading branch information
nesb1 committed Oct 6, 2023
1 parent e318c94 commit 0327842
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording.
([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969))

## Version 1.20.0/0.41b0 (2023-09-01)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async def decorated_publish(
if not span:
return await publish(message, routing_key, **kwargs)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(message.properties.headers)
propagate.inject(message.properties.headers)
return_value = await publish(message, routing_key, **kwargs)
return return_value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
from typing import Type
from unittest import TestCase, mock, skipIf
from unittest.mock import MagicMock

from aio_pika import Exchange, RobustExchange

Expand Down Expand Up @@ -92,6 +93,35 @@ def test_publish(self):
def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)


@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
Expand Down Expand Up @@ -144,3 +174,32 @@ def test_publish(self):

def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject") as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ def decorated_function(
exchange, routing_key, body, properties, mandatory
)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def test_decorate_basic_publish(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
Expand Down Expand Up @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

Expand Down Expand Up @@ -393,7 +391,6 @@ def test_decorate_basic_publish_with_hook(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
Expand All @@ -402,3 +399,52 @@ def test_decorate_basic_publish_with_hook(
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish_when_span_is_not_recording(
self,
use_span: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
exchange_name = "test-exchange"
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
publish_hook = mock.MagicMock()

mocked_span = mock.MagicMock()
mocked_span.is_recording.return_value = False
get_span.return_value = mocked_span

decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer, publish_hook
)
retval = decorated_basic_publish(
exchange_name, routing_key, mock_body, properties
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=exchange_name,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

0 comments on commit 0327842

Please sign in to comment.