Skip to content

Commit 3a8603f

Browse files
committed
fix: do not create spans when confluent-kafka.poll() response is empty
Signed-off-by: Cagri Yonca <cagri@ibm.com>
1 parent aff617e commit 3a8603f

File tree

2 files changed

+304
-2
lines changed

2 files changed

+304
-2
lines changed

src/instana/instrumentation/kafka/confluent_kafka_python.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ def trace_kafka_poll(
251251

252252
try:
253253
res = wrapped(*args, **kwargs)
254-
create_span("poll", res.topic(), res.headers())
254+
if res:
255+
create_span("poll", res.topic(), res.headers())
255256
return res
256257
except Exception as exc:
257258
exception = exc

tests/clients/kafka/test_confluent_kafka.py

Lines changed: 302 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33

44
import os
5+
import threading
56
import time
6-
from typing import Generator
7+
from typing import Generator, List
78

89
import pytest
910
from confluent_kafka import Consumer, KafkaException, Producer
@@ -775,3 +776,303 @@ def test_trace_kafka_close_exception_handling(self, span: "InstanaSpan") -> None
775776

776777
# Verify span was ended
777778
assert not span.is_recording()
779+
780+
def test_confluent_kafka_poll_returns_none(self) -> None:
781+
consumer_config = self.kafka_config.copy()
782+
consumer_config["group.id"] = "test-empty-poll-group"
783+
consumer_config["auto.offset.reset"] = "earliest"
784+
785+
consumer = Consumer(consumer_config)
786+
consumer.subscribe([testenv["kafka_topic"] + "_3"])
787+
788+
with self.tracer.start_as_current_span("test"):
789+
msg = consumer.poll(timeout=0.1)
790+
791+
assert msg is None
792+
793+
consumer.close()
794+
795+
spans = self.recorder.queued_spans()
796+
797+
assert len(spans) == 1
798+
test_span = spans[0]
799+
assert test_span.n == "sdk"
800+
assert test_span.data["sdk"]["name"] == "test"
801+
802+
def test_confluent_kafka_poll_returns_none_with_context_cleanup(self) -> None:
803+
consumer_config = self.kafka_config.copy()
804+
consumer_config["group.id"] = "test-context-cleanup-group"
805+
consumer_config["auto.offset.reset"] = "earliest"
806+
807+
consumer = Consumer(consumer_config)
808+
consumer.subscribe([testenv["kafka_topic"] + "_3"])
809+
810+
# Consume any existing messages to ensure topic is empty
811+
while True:
812+
msg = consumer.poll(timeout=0.5)
813+
if msg is None:
814+
break
815+
816+
# Clear any spans created during cleanup
817+
self.recorder.clear_spans()
818+
819+
with self.tracer.start_as_current_span("test"):
820+
for _ in range(3):
821+
msg = consumer.poll(timeout=0.1)
822+
assert msg is None
823+
824+
consumer.close()
825+
826+
spans = self.recorder.queued_spans()
827+
assert len(spans) == 1
828+
test_span = spans[0]
829+
assert test_span.n == "sdk"
830+
831+
def test_confluent_kafka_poll_none_then_message(self) -> None:
832+
# First, create a temporary consumer to clean up any existing messages
833+
cleanup_config = self.kafka_config.copy()
834+
cleanup_config["group.id"] = "test-none-then-message-cleanup"
835+
cleanup_config["auto.offset.reset"] = "earliest"
836+
837+
cleanup_consumer = Consumer(cleanup_config)
838+
cleanup_consumer.subscribe([testenv["kafka_topic"] + "_3"])
839+
840+
# Consume any existing messages
841+
while True:
842+
msg = cleanup_consumer.poll(timeout=0.5)
843+
if msg is None:
844+
break
845+
846+
cleanup_consumer.close()
847+
848+
# Clear any spans created during cleanup
849+
self.recorder.clear_spans()
850+
851+
# Now run the actual test with a fresh consumer
852+
consumer_config = self.kafka_config.copy()
853+
consumer_config["group.id"] = "test-none-then-message-group"
854+
consumer_config["auto.offset.reset"] = "earliest"
855+
856+
consumer = Consumer(consumer_config)
857+
consumer.subscribe([testenv["kafka_topic"] + "_3"])
858+
859+
with self.tracer.start_as_current_span("test"):
860+
msg1 = consumer.poll(timeout=0.1)
861+
assert msg1 is None
862+
863+
self.producer.produce(testenv["kafka_topic"] + "_3", b"test_message")
864+
self.producer.flush(timeout=10)
865+
866+
msg2 = consumer.poll(timeout=5)
867+
assert msg2 is not None
868+
assert msg2.value() == b"test_message"
869+
870+
consumer.close()
871+
872+
spans = self.recorder.queued_spans()
873+
assert len(spans) == 3
874+
875+
kafka_span = get_first_span_by_filter(
876+
spans,
877+
lambda span: span.n == "kafka" and span.data["kafka"]["access"] == "poll",
878+
)
879+
assert kafka_span is not None
880+
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + "_3"
881+
882+
kafka_span = get_first_span_by_filter(
883+
spans,
884+
lambda span: span.n == "kafka"
885+
and span.data["kafka"]["access"] == "produce",
886+
)
887+
assert kafka_span is not None
888+
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + "_3"
889+
890+
def test_confluent_kafka_poll_multithreaded_context_isolation(self) -> None:
891+
agent.options.allow_exit_as_root = True
892+
agent.options.set_trace_configurations()
893+
894+
# Produce messages to multiple topics
895+
num_threads = 3
896+
messages_per_topic = 2
897+
898+
for i in range(num_threads):
899+
topic = f"{testenv['kafka_topic']}_thread_{i}"
900+
# Create topic
901+
try:
902+
self.kafka_client.create_topics(
903+
[NewTopic(topic, num_partitions=1, replication_factor=1)]
904+
)
905+
except KafkaException:
906+
pass
907+
908+
# Produce messages
909+
for j in range(messages_per_topic):
910+
self.producer.produce(topic, f"message_{j}".encode())
911+
912+
self.producer.flush(timeout=10)
913+
time.sleep(1) # Allow messages to be available
914+
915+
# Track results from each thread
916+
thread_results: List[dict] = []
917+
thread_errors: List[Exception] = []
918+
lock = threading.Lock()
919+
920+
def consume_from_topic(thread_id: int) -> None:
921+
try:
922+
topic = f"{testenv['kafka_topic']}_thread_{thread_id}"
923+
consumer_config = self.kafka_config.copy()
924+
consumer_config["group.id"] = f"test-multithread-group-{thread_id}"
925+
consumer_config["auto.offset.reset"] = "earliest"
926+
927+
consumer = Consumer(consumer_config)
928+
consumer.subscribe([topic])
929+
930+
messages_consumed = 0
931+
none_polls = 0
932+
max_polls = 10
933+
934+
with self.tracer.start_as_current_span(f"thread-{thread_id}"):
935+
for _ in range(max_polls):
936+
msg = consumer.poll(timeout=1.0)
937+
938+
if msg is None:
939+
none_polls += 1
940+
_ = consumer_span.get(None)
941+
else:
942+
if msg.error():
943+
continue
944+
messages_consumed += 1
945+
946+
assert msg.topic() == topic
947+
948+
if messages_consumed >= messages_per_topic:
949+
break
950+
951+
consumer.close()
952+
953+
with lock:
954+
thread_results.append(
955+
{
956+
"thread_id": thread_id,
957+
"topic": topic,
958+
"messages_consumed": messages_consumed,
959+
"none_polls": none_polls,
960+
"success": True,
961+
}
962+
)
963+
964+
except Exception as e:
965+
with lock:
966+
thread_errors.append(e)
967+
thread_results.append(
968+
{"thread_id": thread_id, "success": False, "error": str(e)}
969+
)
970+
971+
threads = []
972+
for i in range(num_threads):
973+
thread = threading.Thread(target=consume_from_topic, args=(i,))
974+
threads.append(thread)
975+
thread.start()
976+
977+
for thread in threads:
978+
thread.join(timeout=30)
979+
980+
assert len(thread_errors) == 0, f"Errors in threads: {thread_errors}"
981+
982+
assert len(thread_results) == num_threads
983+
for result in thread_results:
984+
assert result[
985+
"success"
986+
], f"Thread {result['thread_id']} failed: {result.get('error')}"
987+
assert (
988+
result["messages_consumed"] == messages_per_topic
989+
), f"Thread {result['thread_id']} consumed {result['messages_consumed']} messages, expected {messages_per_topic}"
990+
991+
spans = self.recorder.queued_spans()
992+
993+
expected_min_spans = num_threads * (1 + messages_per_topic * 2)
994+
assert (
995+
len(spans) >= expected_min_spans
996+
), f"Expected at least {expected_min_spans} spans, got {len(spans)}"
997+
998+
for i in range(num_threads):
999+
topic = f"{testenv['kafka_topic']}_thread_{i}"
1000+
1001+
poll_spans = [
1002+
s
1003+
for s in spans
1004+
if s.n == "kafka"
1005+
and s.data.get("kafka", {}).get("access") == "poll"
1006+
and s.data.get("kafka", {}).get("service") == topic
1007+
]
1008+
1009+
assert (
1010+
len(poll_spans) >= 1
1011+
), f"Expected poll spans for topic {topic}, got {len(poll_spans)}"
1012+
1013+
topics_to_delete = [
1014+
f"{testenv['kafka_topic']}_thread_{i}" for i in range(num_threads)
1015+
]
1016+
self.kafka_client.delete_topics(topics_to_delete)
1017+
time.sleep(1)
1018+
1019+
def test_confluent_kafka_poll_multithreaded_with_none_returns(self) -> None:
1020+
num_threads = 5
1021+
1022+
thread_errors: List[Exception] = []
1023+
lock = threading.Lock()
1024+
1025+
def poll_empty_topic(thread_id: int) -> None:
1026+
try:
1027+
consumer_config = self.kafka_config.copy()
1028+
consumer_config["group.id"] = f"test-empty-poll-{thread_id}"
1029+
consumer_config["auto.offset.reset"] = "earliest"
1030+
1031+
consumer = Consumer(consumer_config)
1032+
consumer.subscribe([testenv["kafka_topic"] + "_3"])
1033+
1034+
# Consume any existing messages to ensure topic is empty
1035+
while True:
1036+
msg = consumer.poll(timeout=0.5)
1037+
if msg is None:
1038+
break
1039+
1040+
with self.tracer.start_as_current_span(
1041+
f"empty-poll-thread-{thread_id}"
1042+
):
1043+
for _ in range(5):
1044+
msg = consumer.poll(timeout=0.1)
1045+
assert msg is None, "Expected None from empty topic"
1046+
1047+
time.sleep(0.01)
1048+
1049+
consumer.close()
1050+
1051+
except Exception as e:
1052+
with lock:
1053+
thread_errors.append(e)
1054+
1055+
threads = []
1056+
for i in range(num_threads):
1057+
thread = threading.Thread(target=poll_empty_topic, args=(i,))
1058+
threads.append(thread)
1059+
thread.start()
1060+
1061+
for thread in threads:
1062+
thread.join(timeout=10)
1063+
1064+
assert (
1065+
len(thread_errors) == 0
1066+
), f"Context errors in threads: {[str(e) for e in thread_errors]}"
1067+
1068+
spans = self.recorder.queued_spans()
1069+
1070+
test_spans = [s for s in spans if s.n == "sdk"]
1071+
assert (
1072+
len(test_spans) == num_threads
1073+
), f"Expected {num_threads} test spans, got {len(test_spans)}"
1074+
1075+
kafka_spans = [s for s in spans if s.n == "kafka"]
1076+
assert (
1077+
len(kafka_spans) == 0
1078+
), f"Expected no kafka spans for None polls, got {len(kafka_spans)}"

0 commit comments

Comments
 (0)