diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 019369b42e..ef55869107 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -25,11 +25,11 @@ def _extract_argument(key, position, default_value, args, kwargs): return kwargs.get(key, default_value) @staticmethod - def extract_send_topic(args): + def extract_send_topic(args, kwargs): """extract topic from `send` method arguments in KafkaProducer class""" - if len(args) > 0: - return args[0] - return "unknown" + return KafkaPropertiesExtractor._extract_argument( + "topic", 0, "unknown", args, kwargs + ) @staticmethod def extract_send_value(args, kwargs): @@ -56,7 +56,7 @@ def extract_send_headers(args, kwargs): def extract_send_partition(instance, args, kwargs): """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" try: - topic = KafkaPropertiesExtractor.extract_send_topic(args) + topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) partition = KafkaPropertiesExtractor._extract_argument( @@ -145,7 +145,7 @@ def _traced_send(func, instance, args, kwargs): headers = [] kwargs["headers"] = headers - topic = KafkaPropertiesExtractor.extract_send_topic(args) + topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers( instance ) diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py index b92f297248..0390163e04 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py @@ -19,6 +19,14 @@ def setUp(self) -> None: self.headers = [] self.kwargs = {"partition": 0, "headers": self.headers} + def test_wrap_send_with_topic_as_arg(self): + self.wrap_send_helper(self.args, self.kwargs) + + def test_wrap_send_with_topic_as_kwarg(self): + kwargs = self.kwargs.copy() + kwargs['topic'] = self.topic_name + self.wrap_send_helper([], kwargs) + @mock.patch( "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers" ) @@ -28,8 +36,10 @@ def setUp(self) -> None: @mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span") @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") - def test_wrap_send( + def wrap_send_helper( self, + send_args: [str], + send_kwargs: dict, inject: mock.MagicMock, set_span_in_context: mock.MagicMock, enrich_span: mock.MagicMock, @@ -44,12 +54,12 @@ def test_wrap_send( wrapped_send = _wrap_send(tracer, produce_hook) retval = wrapped_send( - original_send_callback, kafka_producer, self.args, self.kwargs + original_send_callback, kafka_producer, send_args, send_kwargs ) extract_bootstrap_servers.assert_called_once_with(kafka_producer) extract_send_partition.assert_called_once_with( - kafka_producer, self.args, self.kwargs + kafka_producer, send_args, send_kwargs ) tracer.start_as_current_span.assert_called_once_with( expected_span_name, kind=SpanKind.PRODUCER @@ -69,10 +79,10 @@ def test_wrap_send( self.headers, context=context, setter=_kafka_setter ) - produce_hook.assert_called_once_with(span, self.args, self.kwargs) + produce_hook.assert_called_once_with(span, send_args, send_kwargs) original_send_callback.assert_called_once_with( - *self.args, **self.kwargs + *send_args, **send_kwargs ) self.assertEqual(retval, original_send_callback.return_value)