From da0333dc0f051e6683893b73ced61b6b34d4dc25 Mon Sep 17 00:00:00 2001 From: Varsha GS Date: Thu, 11 Sep 2025 14:16:56 +0530 Subject: [PATCH] fix: suppression propagation in kafka Signed-off-by: Varsha GS --- src/instana/propagators/kafka_propagator.py | 38 +++++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/instana/propagators/kafka_propagator.py b/src/instana/propagators/kafka_propagator.py index 9ba27940..97bae58c 100644 --- a/src/instana/propagators/kafka_propagator.py +++ b/src/instana/propagators/kafka_propagator.py @@ -1,15 +1,12 @@ # (c) Copyright IBM Corp. 2025 -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import Any, Dict, Optional from opentelemetry.trace.span import format_span_id from instana.log import logger from instana.propagators.base_propagator import BasePropagator, CarrierT from instana.util.ids import hex_id_limited - -if TYPE_CHECKING: - from instana.span_context import SpanContext - +from instana.span_context import SpanContext class KafkaPropagator(BasePropagator): """ @@ -53,7 +50,7 @@ def extract_carrier_headers(self, carrier: CarrierT) -> Dict[str, Any]: def extract( self, carrier: CarrierT, disable_w3c_trace_context: bool = False - ) -> Optional["SpanContext"]: + ) -> Optional[SpanContext]: """ This method overrides one of the Base classes as with the introduction of W3C trace context for the Kafka requests more extracting steps and @@ -64,7 +61,7 @@ def extract( disable_w3c_trace_context (bool): A flag to disable the W3C trace context. Returns: - Optional["SpanContext"]: The extracted span context or None. + Optional[SpanContext]: The extracted span context or None. """ try: headers = self.extract_carrier_headers(carrier=carrier) @@ -79,7 +76,7 @@ def extract( # Assisted by watsonx Code Assistant def inject( self, - span_context: "SpanContext", + span_context: SpanContext, carrier: CarrierT, disable_w3c_trace_context: bool = True, ) -> None: @@ -103,7 +100,26 @@ def inject( # Suppression `level` made in the child context or in the parent context # has priority over any non-suppressed `level` setting suppression_level = int(self.extract_instana_headers(dictionary_carrier)[2]) - span_context.level = min(suppression_level, span_context.level) + new_level = min(suppression_level, span_context.level) + + if new_level != span_context.level: + # Create a new span context with the updated level + span_context = SpanContext( + trace_id=span_context.trace_id, + span_id=span_context.span_id, + is_remote=span_context.is_remote, + trace_flags=span_context.trace_flags, + trace_state=span_context.trace_state, + level=new_level, + synthetic=span_context.synthetic, + trace_parent=span_context.trace_parent, + instana_ancestor=span_context.instana_ancestor, + long_trace_id=span_context.long_trace_id, + correlation_type=span_context.correlation_type, + correlation_id=span_context.correlation_id, + traceparent=span_context.traceparent, + tracestate=span_context.tracestate + ) def inject_key_value(carrier, key, value): if isinstance(carrier, list): @@ -119,9 +135,9 @@ def inject_key_value(carrier, key, value): inject_key_value( carrier, self.KAFKA_HEADER_KEY_L_S, - str(suppression_level).encode("utf-8"), + str(span_context.level).encode("utf-8"), ) - if suppression_level == 1: + if span_context.level == 1: inject_key_value( carrier, self.KAFKA_HEADER_KEY_T,