Skip to content

Commit cadb262

Browse files
committed
fix: IndexError in the confluent_kafka_python.py
Fixed IndexError in `confluent_kafka_python.py` by handling both positional and keyword arguments for the topic parameter in the `trace_kafka_produce` function. The issue occurred when the topic was passed as a keyword argument, resulting in an empty args tuple and causing an IndexError when trying to access `args[0]`. The solution: 1. Modified the `trace_kafka_produce` function to get the topic from either `args` or `kwargs` 2. Added safety checks to handle edge cases 3. Added two new test methods to verify the fix works with different argument patterns: - `test_trace_confluent_kafka_produce_with_keyword_topic` - `test_trace_confluent_kafka_produce_with_keyword_args` This fix ensures that the Kafka instrumentation works correctly regardless of how the `produce` method is called, improving the robustness of the Python sensor.
1 parent e1641c0 commit cadb262

File tree

2 files changed

+85
-18
lines changed

2 files changed

+85
-18
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414
from instana.log import logger
1515
from instana.propagators.format import Format
1616
from instana.singletons import get_tracer
17-
from instana.util.traceutils import (
18-
get_tracer_tuple,
19-
tracing_is_off,
20-
)
2117
from instana.span.span import InstanaSpan
18+
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
2219

2320
consumer_token = None
2421
consumer_span = contextvars.ContextVar("confluent_kafka_consumer_span")
@@ -69,16 +66,26 @@ def trace_kafka_produce(
6966

7067
tracer, parent_span, _ = get_tracer_tuple()
7168
parent_context = parent_span.get_span_context() if parent_span else None
69+
70+
# Get the topic from either args or kwargs
71+
topic = args[0] if args else kwargs.get("topic")
72+
73+
# Handle the case where topic might not be available
74+
if topic is None:
75+
# Log a warning and proceed with the original function call
76+
logger.warning("Kafka produce called without a topic parameter")
77+
return wrapped(*args, **kwargs)
78+
7279
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
7380
"kafka",
7481
"produce",
75-
args[0],
82+
topic,
7683
)
7784

7885
with tracer.start_as_current_span(
7986
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
8087
) as span:
81-
span.set_attribute("kafka.service", args[0])
88+
span.set_attribute("kafka.service", topic)
8289
span.set_attribute("kafka.access", "produce")
8390

8491
# context propagation
@@ -89,6 +96,10 @@ def trace_kafka_produce(
8996
# dictionary. To maintain compatibility with the headers for the
9097
# Kafka Python library, we will use a list of tuples.
9198
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
99+
100+
# Initialize headers if it's None
101+
if headers is None:
102+
headers = []
92103
suppression_header = {"x_instana_l_s": "0" if is_suppressed else "1"}
93104
headers.append(suppression_header)
94105

tests/clients/kafka/test_confluent_kafka.py

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,26 @@
55
from typing import Generator
66

77
import pytest
8-
from confluent_kafka import (
9-
Consumer,
10-
KafkaException,
11-
Producer,
12-
)
8+
from confluent_kafka import Consumer, KafkaException, Producer
139
from confluent_kafka.admin import AdminClient, NewTopic
14-
from mock import patch, Mock
10+
from mock import Mock, patch
1511
from opentelemetry.trace import SpanKind
1612
from opentelemetry.trace.span import format_span_id
1713

1814
from instana.configurator import config
19-
from instana.options import StandardOptions
20-
from instana.singletons import agent, tracer
21-
from instana.util.config import parse_ignored_endpoints_from_yaml
22-
from tests.helpers import get_first_span_by_filter, testenv
2315
from instana.instrumentation.kafka import confluent_kafka_python
2416
from instana.instrumentation.kafka.confluent_kafka_python import (
2517
clear_context,
26-
save_consumer_span_into_context,
2718
close_consumer_span,
28-
trace_kafka_close,
2919
consumer_span,
20+
save_consumer_span_into_context,
21+
trace_kafka_close,
3022
)
23+
from instana.options import StandardOptions
24+
from instana.singletons import agent, tracer
3125
from instana.span.span import InstanaSpan
26+
from instana.util.config import parse_ignored_endpoints_from_yaml
27+
from tests.helpers import get_first_span_by_filter, testenv
3228

3329

3430
class TestConfluentKafka:
@@ -120,6 +116,66 @@ def test_trace_confluent_kafka_produce(self) -> None:
120116
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
121117
assert kafka_span.data["kafka"]["access"] == "produce"
122118

119+
def test_trace_confluent_kafka_produce_with_keyword_topic(self) -> None:
120+
"""Test that tracing works when topic is passed as a keyword argument."""
121+
with tracer.start_as_current_span("test"):
122+
# Pass topic as a keyword argument
123+
self.producer.produce(topic=testenv["kafka_topic"], value=b"raw_bytes")
124+
self.producer.flush(timeout=10)
125+
126+
spans = self.recorder.queued_spans()
127+
assert len(spans) == 2
128+
129+
kafka_span = spans[0]
130+
test_span = spans[1]
131+
132+
# Same traceId
133+
assert test_span.t == kafka_span.t
134+
135+
# Parent relationships
136+
assert kafka_span.p == test_span.s
137+
138+
# Error logging
139+
assert not test_span.ec
140+
assert not kafka_span.ec
141+
142+
assert kafka_span.n == "kafka"
143+
assert kafka_span.k == SpanKind.CLIENT
144+
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
145+
assert kafka_span.data["kafka"]["access"] == "produce"
146+
147+
def test_trace_confluent_kafka_produce_with_keyword_args(self) -> None:
148+
"""Test that tracing works when both topic and headers are passed as keyword arguments."""
149+
with tracer.start_as_current_span("test"):
150+
# Pass both topic and headers as keyword arguments
151+
self.producer.produce(
152+
topic=testenv["kafka_topic"],
153+
value=b"raw_bytes",
154+
headers=[("custom-header", b"header-value")],
155+
)
156+
self.producer.flush(timeout=10)
157+
158+
spans = self.recorder.queued_spans()
159+
assert len(spans) == 2
160+
161+
kafka_span = spans[0]
162+
test_span = spans[1]
163+
164+
# Same traceId
165+
assert test_span.t == kafka_span.t
166+
167+
# Parent relationships
168+
assert kafka_span.p == test_span.s
169+
170+
# Error logging
171+
assert not test_span.ec
172+
assert not kafka_span.ec
173+
174+
assert kafka_span.n == "kafka"
175+
assert kafka_span.k == SpanKind.CLIENT
176+
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
177+
assert kafka_span.data["kafka"]["access"] == "produce"
178+
123179
def test_trace_confluent_kafka_consume(self) -> None:
124180
agent.options.set_trace_configurations()
125181
# Produce some events

0 commit comments

Comments
 (0)