Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In kafka events, the traceId and spanId are not written in the logs #3146

Closed
oooximionnn opened this issue Mar 20, 2024 · 12 comments · Fixed by #3148
Closed

In kafka events, the traceId and spanId are not written in the logs #3146

oooximionnn opened this issue Mar 20, 2024 · 12 comments · Fixed by #3148

Comments

@oooximionnn
Copy link

Describe
In kafka events, the traceId and spanId are not written in the logs

Environment
Java 17. Spring boot starter 3.2.2
We use the following dependencies for tracing:

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-otlp</artifactId>
        </dependency>

Kafka Template Bean:

import io.micrometer.common.KeyValues;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;


@Configuration
@ComponentScan
@RequiredArgsConstructor
public class KafkaConfig {
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(final ProducerFactory<Object, Object> producerFactory) {
        final KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setObservationEnabled(true);
        kafkaTemplate.setObservationConvention(new KafkaTemplateObservationConvention() {
            @Override
            public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
                return KeyValues.of("topic", context.getDestination(),
                        "id", String.valueOf(context.getRecord().key()));
            }
        });
        return kafkaTemplate;
    }
}
import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextScheduledExecutorService;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

@Configuration(proxyBeanMethods = false)
class CommonConfiguration {

    // Example of Async Servlets setup
    @Configuration(proxyBeanMethods = false)
    @EnableAsync
    static class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            return ContextExecutorService.wrap(Executors.newCachedThreadPool(), ContextSnapshot::captureAll);
        }

        @Override
        public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            configurer.setTaskExecutor(new SimpleAsyncTaskExecutor(r -> new Thread(ContextSnapshotFactory.builder().build().captureAll().wrap(r))));
        }
    }


    /**
     * NAME OF THE BEAN IS IMPORTANT!
     * <p>
     * We need to wrap this for @Async related things to propagate the context.
     *
     * @see EnableAsync
     */
    // [Observability] instrumenting executors
    @Bean(name = "taskExecutor", destroyMethod = "shutdown")
    ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler() {
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }


            @Override
            public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
                return ContextScheduledExecutorService.wrap(super.getScheduledExecutor());
            }
        };
        threadPoolTaskScheduler.initialize();
        return threadPoolTaskScheduler;
    }
}

application.yml file

logging:
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} %5p [${spring.application.name:-},%X{traceId:-},%X{spanId:-}] 0 --- [%15.80t] %-40.40logger{39} : %msg%n"

management:
  tracing:
    sampling:
      probability: 1.0
    propagation:
      produce: b3
      consume: b3
      type: b3,w3c

code:

@Async
    protected void sendMessage(Object message, String key,  String topic) {

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

What's going on
In this case, the kafka topic places traceId and span Id in headers, but the logging pattern does not find these traceId and spanId and accordingly does not write them in logs.

What have we changed

 @Async
    protected void sendMessage(Object message, String key,  String topic) {

        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

We added Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); to producer. and information about traceId and spanId began to appear in the logs.

Is there a better solution for not writing this?: Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

We also used this link: https://github.com/micrometer-metrics/micrometer-samples/blob/main/kafka-producer/src/main/java/com/example/micrometer/ManualConfiguration.java
It didn't help.

The problem is related to the transition to spring boot 3. On Spring boot 2, the Sleuth library was used for tracing and kafka event tracing was present. But when we started using examples and instructions from micrometer, trace id did not appear, it only appeared in kafka headers

@artembilan
Copy link
Member

Apparently this is a duplication of: micrometer-metrics/micrometer#4859.

Any chances that you can share with us a simple project where we can reproduce, debug and play with?
Just to be sure that we are on the same page.

Thanks

@oooximionnn
Copy link
Author

oooximionnn commented Mar 21, 2024

Here is simple example https://github.com/oooximionnn/KafkaTracing/
2024-03-21_14-35-25

trace id and span id are empty

@mkoralewski
Copy link

I have similar problem.
For ConcurrentKafkaListenerContainerFactory we set CommonErrorHandler that you can see below:

  @Bean
  public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(0, 0);
    return new DefaultErrorHandler(
        (consumerRecord, exception) -> log.error("Failed to process ATG message from topic:{}. key:{}",
            consumerRecord.topic(), consumerRecord.key(), exception),
        fixedBackOff);
  }

When I was using spring boot 3.1.9 logs produced by this handler had MDCs (traceId, spanId and one custom BaggageField). After upgrade to spring boot 3.2.4 MDCs are gone.

@artembilan
Copy link
Member

@mkoralewski ,

it is not similar.
This issue talks about KafkaTemplate and KafkaProducer.
Your one is for consumer side.
Please, consider to raise a new issue with reproducer.
More over this issue talks about a migration from Spring Boot 2.x to 3.x.
You claim that it was OK in the previous Spring Boot minor.
However an observation instrumentation is still the same.
So, something is needed to be shared with us to let us to reproduce and play with.

@artembilan
Copy link
Member

@oooximionnn ,

thanks for the sample!

I didn't run it yet, but looking into the screenshot, I see that your Сообщение отправлено успешно is logged on the kafka-producer-network-thread which is definitely out of your Для прокидывания traceId в ассинхронных вызовах configuration.
I mean that thread belongs to Apache Kafka Client by itself.
I don't know what Spring has done before, but apparently there is no such an instrumentation for KafkaProducer.
I'm not sure what that is supposed to be, but it is definitely out of Spring for Apache Kafka project scope.
In general we have spread an observation instrumentation throughout Spring portfolio projects.
So, the KafkaTemplate instrumentation migrated from Spring Sleuth to this project.
Where the KafkaProducer instrumentation is right now, I need to figure out together with Micrometer team.
Will be back when have more info...

@artembilan
Copy link
Member

@artembilan
Copy link
Member

Just tested the fix against this simple unit test:

	@Test
	void logsAreCorrelatedFromKafkaTemplateCallback() throws ExecutionException, InterruptedException {
		this.kafkaTemplate.send("test-topic", "test data")
				.thenAccept((result) -> LOG.warn("sent: " + result))
				.get();
	}

The output in logs like this:

2024-03-21T11:29:42.890-04:00  WARN 39812 --- [spring-kafka-template-tracing] [cing-producer-1] [65fc526678dd9d4c091afda72b6512cc-091afda72b6512cc] ringKafkaTemplateTracingApplicationTests : sent: SendResult [producerRecord=ProducerRecord(topic=test-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = traceparent, value = [48, 48, 45, 54, 53, 102, 99, 53, 50, 54, 54, 55, 56, 100, 100, 57, 100, 52, 99, 48, 57, 49, 97, 102, 100, 97, 55, 50, 98, 54, 53, 49, 50, 99, 99, 45, 48, 57, 49, 97, 102, 100, 97, 55, 50, 98, 54, 53, 49, 50, 99, 99, 45, 48, 48])], isReadOnly = true), key=null, value=test data, timestamp=null), recordMetadata=test-topic-1@0]

I think this is what you are looking for.

Will open PR soon.

@artembilan artembilan added this to the 3.2.0-RC1 milestone Mar 21, 2024
artembilan added a commit to artembilan/spring-kafka that referenced this issue Mar 21, 2024
Fixes: spring-projects#3146

There is a requirement to see logs correlated even in the `CompletableFuture` result after `KafkaTemplate.send()` operation

* Add `observation.openScope()` into the `buildCallback()` logic

**Auto-cherry-pick to `3.1.x` & `3.0.x`**
sobychacko pushed a commit that referenced this issue Mar 21, 2024
Fixes: #3146

Propagate `KafkaTemplate` observation callback.

There is a requirement to see logs correlated in the `CompletableFuture` result 
after `KafkaTemplate.send()` operation.

* Add `observation.openScope()` into the `buildCallback()` logic

**Auto-cherry-pick to `3.1.x` & `3.0.x`**
artembilan added a commit that referenced this issue Mar 21, 2024
Fixes: #3146

Propagate `KafkaTemplate` observation callback.

There is a requirement to see logs correlated in the `CompletableFuture` result
after `KafkaTemplate.send()` operation.

* Add `observation.openScope()` into the `buildCallback()` logic

**Auto-cherry-pick to `3.0.x`**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java
spring-builds pushed a commit that referenced this issue Mar 21, 2024
Fixes: #3146

Propagate `KafkaTemplate` observation callback.

There is a requirement to see logs correlated in the `CompletableFuture` result
after `KafkaTemplate.send()` operation.

* Add `observation.openScope()` into the `buildCallback()` logic

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

(cherry picked from commit fffd5ef)
@oooximionnn
Copy link
Author

Thank you. Please tell me what I need to do in the end to get the traceId and span Id in the producer and consumer.

  1. what dependencies are needed
  2. do I need some kind of bean type configuration
  3. what should be written in application.yml
  4. Is there anything else?

we are using spring boot 3

@artembilan
Copy link
Member

@oooximionnn ,
If you would like to test the fix, you need to use a SNAPSHOT for spring-kafka in your project.
The release is going to happen on April 15th.

@oooximionnn
Copy link
Author

oooximionnn commented Apr 18, 2024

@artembilan
Hello, thank you, now I see the trace id found in the logs in the producer, but the listeners are still without that trace id logs
2024-04-18_15-54-02
2024-04-18_15-53-18

@artembilan
Copy link
Member

@oooximionnn ,

Doesn't look like your question is related to the problem we have discussed in this issue.

The @KafkaListener side is fully different story.
Please, consider to raise a Stackoverflow question with much more info.
Meanwhile see an observationEnabled property of the ContainerProperties:

	/**
	 * Set to true to enable observation via Micrometer. When false (default)
	 * basic Micrometer timers are used instead (when enabled).
	 * @param observationEnabled true to enable.
	 * @since 3.0
	 * @see #setMicrometerEnabled(boolean)
	 */
	public void setObservationEnabled(boolean observationEnabled) {

@oooximionnn
Copy link
Author

@artembilan
Thank you very much the problem is solved :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants