Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Pika basicConsume context propagation #766

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-asgi` now explicitly depends on asgiref as it uses the package instead of instrumenting it.
([#765](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/765))
- `opentelemetry-instrumentation-pika` now propagates context to basic_consume callback
([#766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/766))

## [1.6.2-0.25b2](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.2-0.25b2) - 2021-10-19

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from typing import Any, Callable, Collection, Dict, Optional
from typing import Any, Collection, Dict, Optional

import wrapt
from pika.adapters import BlockingConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ def decorated_callback(
tracer,
channel,
properties,
destination=method.exchange if method.exchange else method.routing_key,
span_kind=SpanKind.CONSUMER,
task_name=task_name,
operation=MessagingOperationValues.RECEIVE,
)
with trace.use_span(span, end_on_exit=True):
retval = callback(channel, method, properties, body)
context.detach(token)
try:
with trace.use_span(span, end_on_exit=True):
retval = callback(channel, method, properties, body)
finally:
context.detach(token)
return retval

return decorated_callback
Expand All @@ -83,6 +86,7 @@ def decorated_function(
tracer,
channel,
properties,
destination=exchange if exchange else routing_key,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
Expand All @@ -107,6 +111,7 @@ def _get_span(
channel: Channel,
properties: BasicProperties,
task_name: str,
destination: str,
span_kind: SpanKind,
operation: Optional[MessagingOperationValues] = None,
) -> Optional[Span]:
Expand All @@ -116,7 +121,7 @@ def _get_span(
return None
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
name=_generate_span_name("pika", operation), kind=span_kind,
name=_generate_span_name(destination, operation), kind=span_kind,
)
if span.is_recording():
_enrich_span(span, channel, properties, task_name, operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from unittest import TestCase, mock

from pika.adapters import BaseConnection, BlockingConnection
from pika.adapters import BlockingConnection
from pika.channel import Channel
from wrapt import BoundFunctionWrapper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from unittest import TestCase, mock

from pika.channel import Channel
Expand Down Expand Up @@ -38,9 +39,10 @@ def test_get_span(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
destination = "myqueue"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = None
_ = utils._get_span(tracer, channel, properties, task_name, span_kind)
_ = utils._get_span(tracer, channel, properties, task_name, destination, span_kind)
generate_span_name.assert_called_once()
tracer.start_span.assert_called_once_with(
name=generate_span_name.return_value, kind=span_kind
Expand Down Expand Up @@ -182,6 +184,7 @@ def test_decorate_callback(
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
method.exchange = "test_exchange"
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_callback = utils._decorate_callback(
Expand All @@ -195,6 +198,7 @@ def test_decorate_callback(
tracer,
channel,
properties,
destination=method.exchange,
span_kind=SpanKind.CONSUMER,
task_name=mock_task_name,
operation=MessagingOperationValues.RECEIVE,
Expand All @@ -219,19 +223,21 @@ def test_decorate_basic_publish(
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
exchange_name = "test-exchange"
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(
channel, method, mock_body, properties
exchange_name, routing_key, mock_body, properties
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=exchange_name,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
Expand All @@ -242,7 +248,7 @@ def test_decorate_basic_publish(
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
channel, method, mock_body, properties, False
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

Expand Down Expand Up @@ -273,3 +279,28 @@ def test_decorate_basic_publish_no_properties(
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
def test_decorate_basic_publish_published_message_to_queue(
self,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
exchange_name = ""
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(callback, channel, tracer)
decorated_basic_publish(exchange_name, routing_key, mock_body, properties)

get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=routing_key,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
)
oxeye-nikolay marked this conversation as resolved.
Show resolved Hide resolved