From 16feb85dc3e17bfa908fe61bc239fb404015acf4 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Mon, 7 Mar 2022 20:56:39 +0200 Subject: [PATCH] support topic as kwarg --- CHANGELOG.md | 2 + .../instrumentation/kafka/utils.py | 12 ++--- .../tests/test_utils.py | 47 ++++++++++++++++++- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a67773c5e..3f659d761e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903)) - `opentelemetry-instrumentation-falcon` Safer patching mechanism ([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895)) +- `opentelemetry-instrumentation-kafka-python` Fix topic extraction + ([#949](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/949)) ## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29 diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 019369b42e..ef55869107 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -25,11 +25,11 @@ def _extract_argument(key, position, default_value, args, kwargs): return kwargs.get(key, default_value) @staticmethod - def extract_send_topic(args): + def extract_send_topic(args, kwargs): """extract topic from `send` method arguments in KafkaProducer class""" - if len(args) > 0: - return args[0] - return "unknown" + return KafkaPropertiesExtractor._extract_argument( + "topic", 0, "unknown", args, kwargs + ) @staticmethod def extract_send_value(args, kwargs): @@ -56,7 +56,7 @@ def extract_send_headers(args, kwargs): def extract_send_partition(instance, args, kwargs): """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" try: - topic = KafkaPropertiesExtractor.extract_send_topic(args) + topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) partition = KafkaPropertiesExtractor._extract_argument( @@ -145,7 +145,7 @@ def _traced_send(func, instance, args, kwargs): headers = [] kwargs["headers"] = headers - topic = KafkaPropertiesExtractor.extract_send_topic(args) + topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers( instance ) diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py index b92f297248..74d359bfde 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py @@ -28,14 +28,57 @@ def setUp(self) -> None: @mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span") @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") - def test_wrap_send( + def test_wrap_send_with_topic_as_arg( self, inject: mock.MagicMock, set_span_in_context: mock.MagicMock, enrich_span: mock.MagicMock, extract_send_partition: mock.MagicMock, extract_bootstrap_servers: mock.MagicMock, - ): + ) -> None: + self.wrap_send_helper( + inject, + set_span_in_context, + enrich_span, + extract_send_partition, + extract_bootstrap_servers, + ) + + @mock.patch( + "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers" + ) + @mock.patch( + "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_send_partition" + ) + @mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span") + @mock.patch("opentelemetry.trace.set_span_in_context") + @mock.patch("opentelemetry.propagate.inject") + def test_wrap_send_with_topic_as_kwarg( + self, + inject: mock.MagicMock, + set_span_in_context: mock.MagicMock, + enrich_span: mock.MagicMock, + extract_send_partition: mock.MagicMock, + extract_bootstrap_servers: mock.MagicMock, + ) -> None: + self.args = [] + self.kwargs["topic"] = self.topic_name + self.wrap_send_helper( + inject, + set_span_in_context, + enrich_span, + extract_send_partition, + extract_bootstrap_servers, + ) + + def wrap_send_helper( + self, + inject: mock.MagicMock, + set_span_in_context: mock.MagicMock, + enrich_span: mock.MagicMock, + extract_send_partition: mock.MagicMock, + extract_bootstrap_servers: mock.MagicMock, + ) -> None: tracer = mock.MagicMock() produce_hook = mock.MagicMock() original_send_callback = mock.MagicMock()