Skip to content

Commit

Permalink
support topic as kwarg
Browse files Browse the repository at this point in the history
  • Loading branch information
nozik committed Mar 7, 2022
1 parent 2ab6641 commit 75660e0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def _extract_argument(key, position, default_value, args, kwargs):
return kwargs.get(key, default_value)

@staticmethod
def extract_send_topic(args):
def extract_send_topic(args, kwargs):
"""extract topic from `send` method arguments in KafkaProducer class"""
if len(args) > 0:
return args[0]
return "unknown"
return KafkaPropertiesExtractor._extract_argument(
"topic", 0, "unknown", args, kwargs
)

@staticmethod
def extract_send_value(args, kwargs):
Expand All @@ -56,7 +56,7 @@ def extract_send_headers(args, kwargs):
def extract_send_partition(instance, args, kwargs):
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
try:
topic = KafkaPropertiesExtractor.extract_send_topic(args)
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
Expand Down Expand Up @@ -145,7 +145,7 @@ def _traced_send(func, instance, args, kwargs):
headers = []
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_send_topic(args)
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(
instance
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ def setUp(self) -> None:
self.headers = []
self.kwargs = {"partition": 0, "headers": self.headers}

def test_wrap_send_with_topic_as_arg(self):
self.wrap_send_helper(self.args, self.kwargs)

def test_wrap_send_with_topic_as_kwarg(self):
kwargs = self.kwargs.copy()
kwargs['topic'] = self.topic_name
self.wrap_send_helper([], kwargs)

@mock.patch(
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers"
)
Expand All @@ -28,8 +36,10 @@ def setUp(self) -> None:
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
def test_wrap_send(
def wrap_send_helper(
self,
send_args: [str],
send_kwargs: dict,
inject: mock.MagicMock,
set_span_in_context: mock.MagicMock,
enrich_span: mock.MagicMock,
Expand All @@ -44,12 +54,12 @@ def test_wrap_send(

wrapped_send = _wrap_send(tracer, produce_hook)
retval = wrapped_send(
original_send_callback, kafka_producer, self.args, self.kwargs
original_send_callback, kafka_producer, send_args, send_kwargs
)

extract_bootstrap_servers.assert_called_once_with(kafka_producer)
extract_send_partition.assert_called_once_with(
kafka_producer, self.args, self.kwargs
kafka_producer, send_args, send_kwargs
)
tracer.start_as_current_span.assert_called_once_with(
expected_span_name, kind=SpanKind.PRODUCER
Expand All @@ -69,10 +79,10 @@ def test_wrap_send(
self.headers, context=context, setter=_kafka_setter
)

produce_hook.assert_called_once_with(span, self.args, self.kwargs)
produce_hook.assert_called_once_with(span, send_args, send_kwargs)

original_send_callback.assert_called_once_with(
*self.args, **self.kwargs
*send_args, **send_kwargs
)
self.assertEqual(retval, original_send_callback.return_value)

Expand Down

0 comments on commit 75660e0

Please sign in to comment.