Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In kafka events, the traceId and spanId are not written in the logs #4859

Closed
oooximionnn opened this issue Mar 18, 2024 · 4 comments
Closed
Labels
for: external-project For an external project and not something we can fix

Comments

@oooximionnn
Copy link

oooximionnn commented Mar 18, 2024

Describe
In kafka events, the traceId and spanId are not written in the logs

Environment
Java 17. Spring boot starter 3.2.2
We use the following dependencies for tracing:

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-otlp</artifactId>
        </dependency>

Kafka Template Bean:

import io.micrometer.common.KeyValues;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;


@Configuration
@ComponentScan
@RequiredArgsConstructor
public class KafkaConfig {
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(final ProducerFactory<Object, Object> producerFactory) {
        final KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setObservationEnabled(true);
        kafkaTemplate.setObservationConvention(new KafkaTemplateObservationConvention() {
            @Override
            public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
                return KeyValues.of("topic", context.getDestination(),
                        "id", String.valueOf(context.getRecord().key()));
            }
        });
        return kafkaTemplate;
    }
}
import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextScheduledExecutorService;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

@Configuration(proxyBeanMethods = false)
class CommonConfiguration {

    // Example of Async Servlets setup
    @Configuration(proxyBeanMethods = false)
    @EnableAsync
    static class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {
        @Override
        public Executor getAsyncExecutor() {
            return ContextExecutorService.wrap(Executors.newCachedThreadPool(), ContextSnapshot::captureAll);
        }

        @Override
        public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            configurer.setTaskExecutor(new SimpleAsyncTaskExecutor(r -> new Thread(ContextSnapshotFactory.builder().build().captureAll().wrap(r))));
        }
    }


    /**
     * NAME OF THE BEAN IS IMPORTANT!
     * <p>
     * We need to wrap this for @Async related things to propagate the context.
     *
     * @see EnableAsync
     */
    // [Observability] instrumenting executors
    @Bean(name = "taskExecutor", destroyMethod = "shutdown")
    ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler() {
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }


            @Override
            public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
                return ContextScheduledExecutorService.wrap(super.getScheduledExecutor());
            }
        };
        threadPoolTaskScheduler.initialize();
        return threadPoolTaskScheduler;
    }
}

application.yml file

logging:
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} %5p [${spring.application.name:-},%X{traceId:-},%X{spanId:-}] 0 --- [%15.80t] %-40.40logger{39} : %msg%n"

management:
  tracing:
    sampling:
      probability: 1.0
    propagation:
      produce: b3
      consume: b3
      type: b3,w3c

code:

@Async
    protected void sendMessage(Object message, String key,  String topic) {

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

What's going on
In this case, the kafka topic places traceId and span Id in headers, but the logging pattern does not find these traceId and spanId and accordingly does not write them in logs.

What have we changed

 @Async
    protected void sendMessage(Object message, String key,  String topic) {

        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

        kafkaTemplate.send(topic, key, message)
                .thenAccept(result -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.debug("producerListener -> Message was sent. msg: {}", result);
                })
                .exceptionally(ex -> {
                    MDC.setContextMap(copyOfContextMap);
                    log.error("producerListener -> Error: {}", message, ex);
                    return null;
                });
    }

We added Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); to producer. and information about traceId and spanId began to appear in the logs.

Is there a better solution for not writing this?: Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();

@jonatan-ivanov
Copy link
Member

jonatan-ivanov commented Mar 18, 2024

You might be in the wrong repo. :)
Micrometer Tracing is for Distributed Tracing but you might want to open this issue for Spring Kafka instead.

Also, please consider these:

  • Why is the KafkaTemplateObservationConvention relevant? Can you repro the issue without it?
  • How about the custom AsyncConfig/ThreadPoolTaskScheduler and ContextExecutorService usage?
  • logging.pattern.console
  • You might want to check what properties under management.tracing.propagation doing (type overrides the other two)

I'm not sure this is because of your custom async config or creating a scope is missing somewhere in the instrumentation.

/cc @artembilan

@jonatan-ivanov jonatan-ivanov closed this as not planned Won't fix, can't repro, duplicate, stale Mar 18, 2024
@jonatan-ivanov jonatan-ivanov added for: external-project For an external project and not something we can fix and removed waiting-for-triage labels Mar 18, 2024
@oooximionnn
Copy link
Author

oooximionnn commented Mar 19, 2024

I did as you said: Why is the KafkaTemplateObservationConvention relevant? Can you repro the issue without it?
But it didn't help.

We also used this link: https://github.com/micrometer-metrics/micrometer-samples/blob/main/kafka-producer/src/main/java/com/example/micrometer/ManualConfiguration.java
It didn't help.

@oooximionnn
Copy link
Author

oooximionnn commented Mar 19, 2024

The problem is related to the transition to spring boot 3. On Spring boot 2, the Sleuth library was used for tracing and kafka event tracing was present. But when we started using examples and instructions from micrometer, trace id did not appear, it only appeared in kafka headers

@artembilan
Copy link

The kafkaTemplate.setObservationEnabled(true); adds an observation around a producer record send operation.
It has nothing to do with whatever is there afterwards with returned Future.
I believe that .thenAccept(result -> { might be performed on a different thread which is out of @Async control.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project For an external project and not something we can fix
Projects
None yet
Development

No branches or pull requests

3 participants