Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagation from Kafka to RestTemplate broken #26

Closed
timtebeek opened this issue Jul 19, 2018 · 10 comments
Closed

Propagation from Kafka to RestTemplate broken #26

timtebeek opened this issue Jul 19, 2018 · 10 comments

Comments

@timtebeek
Copy link

timtebeek commented Jul 19, 2018

Hi all,

Here's a minimal project to demonstrate an issue I'm having:
https://github.com/timtebeek/opentracing-demo
The project consists of three components: a web frontend, kafka backend and rest backend. The frontend calls the rest backend, and puts five messages on kafka. The Kafka backend picks up the messages and post to the rest backend as well.

The problem I'm having is in the @KafkaListener here: https://github.com/timtebeek/opentracing-demo/blob/master/demo-backend-kafka/src/main/java/demo/kafkabackend/DemoKafkaListener.java#L42

It correctly picks up and reports the trace ids coming in on Kafka; as evidenced by the logs here:

o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=kafka-backend] Discovered group coordinator tim-XPS-15-9560:9092 (id: 2147483647 rack: null)
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=kafka-backend] Revoking previously assigned partitions []
o.s.k.l.KafkaMessageListenerContainer - partitions revoked: []
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=kafka-backend] (Re-)joining group
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=kafka-backend] Successfully joined group with generation 13
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=kafka-backend] Setting newly assigned partitions [opentracing-demo-0]
o.s.k.l.KafkaMessageListenerContainer - partitions assigned: [opentracing-demo-0]
i.j.i.reporters.LoggingReporter - Span reported: 3207769d3be0d5c5:f1854ae1278b9fe8:d01d70634dca85f4:1 - receive
i.j.i.reporters.LoggingReporter - Span reported: 3207769d3be0d5c5:d5e004576fe71064:ea888f922583ab16:1 - receive
i.j.i.reporters.LoggingReporter - Span reported: 3207769d3be0d5c5:f2e75089bb249f5d:c7e1fdc77e1e78cc:1 - receive
i.j.i.reporters.LoggingReporter - Span reported: 3207769d3be0d5c5:994361341b8ad741:5263077bece29bb5:1 - receive
i.j.i.reporters.LoggingReporter - Span reported: 3207769d3be0d5c5:cc7affe6421806b0:bd9bc91d7647b9f8:1 - receive

However, within the @KafkaListener the tracer.activeSpan remains null, so the information is not propagated with the rest calls to the rest backend:

demo.kafkabackend.DemoKafkaListener - Received Message 1
demo.kafkabackend.DemoKafkaListener - Uber trace id:   3207769d3be0d5c5:d01d70634dca85f4:3207769d3be0d5c5:1
demo.kafkabackend.DemoKafkaListener - Second trace id: 3207769d3be0d5c5:f1854ae1278b9fe8:d01d70634dca85f4:1
demo.kafkabackend.DemoKafkaListener - Active span: null
i.j.i.reporters.LoggingReporter - Span reported: 765d81a656a7ff85:765d81a656a7ff85:0:1 - GET
demo.kafkabackend.DemoKafkaListener - REST API call returns OK
demo.kafkabackend.DemoKafkaListener - Received Message 2
demo.kafkabackend.DemoKafkaListener - Uber trace id:   3207769d3be0d5c5:ea888f922583ab16:3207769d3be0d5c5:1
demo.kafkabackend.DemoKafkaListener - Second trace id: 3207769d3be0d5c5:d5e004576fe71064:ea888f922583ab16:1
demo.kafkabackend.DemoKafkaListener - Active span: null
i.j.i.reporters.LoggingReporter - Span reported: 577936584a049514:577936584a049514:0:1 - GET
demo.kafkabackend.DemoKafkaListener - REST API call returns OK
demo.kafkabackend.DemoKafkaListener - Received Message 3
demo.kafkabackend.DemoKafkaListener - Uber trace id:   3207769d3be0d5c5:c7e1fdc77e1e78cc:3207769d3be0d5c5:1
demo.kafkabackend.DemoKafkaListener - Second trace id: 3207769d3be0d5c5:f2e75089bb249f5d:c7e1fdc77e1e78cc:1
demo.kafkabackend.DemoKafkaListener - Active span: null
i.j.i.reporters.LoggingReporter - Span reported: eb966a57e60e4e23:eb966a57e60e4e23:0:1 - GET
demo.kafkabackend.DemoKafkaListener - REST API call returns OK
demo.kafkabackend.DemoKafkaListener - Received Message 4
demo.kafkabackend.DemoKafkaListener - Uber trace id:   3207769d3be0d5c5:5263077bece29bb5:3207769d3be0d5c5:1
demo.kafkabackend.DemoKafkaListener - Second trace id: 3207769d3be0d5c5:994361341b8ad741:5263077bece29bb5:1
demo.kafkabackend.DemoKafkaListener - Active span: null
i.j.i.reporters.LoggingReporter - Span reported: b0d559f98873e047:b0d559f98873e047:0:1 - GET
demo.kafkabackend.DemoKafkaListener - REST API call returns OK
demo.kafkabackend.DemoKafkaListener - Received Message 5
demo.kafkabackend.DemoKafkaListener - Uber trace id:   3207769d3be0d5c5:bd9bc91d7647b9f8:3207769d3be0d5c5:1
demo.kafkabackend.DemoKafkaListener - Second trace id: 3207769d3be0d5c5:cc7affe6421806b0:bd9bc91d7647b9f8:1
demo.kafkabackend.DemoKafkaListener - Active span: null
i.j.i.reporters.LoggingReporter - Span reported: c6cb251ef5a07042:c6cb251ef5a07042:0:1 - GET
demo.kafkabackend.DemoKafkaListener - REST API call returns OK

I would have expected opentracing-kafka-spring to set the activeSpan on my tracer bean to make it available within my @KafkaListener method, so it'll get propagated correctly using the RestTemplate instrumented through opentracing-spring-jaeger-cloud-starter.

What needs to change to make this work? At present my trace information is lost and a new trace is started, making it unsuitable for full tracing.

@malafeev
Copy link
Contributor

malafeev commented Jul 19, 2018

@timtebeek you can extract SpanContext from headers (copied to string map) and then use it as parent:

 @KafkaListener(topics = "topic")
  public void listen(@Payload String message, @Headers Map<String, Object> headers) {
    // copy Map<String, Object> to Map<String, String> if value is String:
      Map<String, String> strHeaders = new HashMap<>();
      for (Entry<String, Object> entry : headers.entrySet()) {
        if(entry.getValue() instanceof String) {
          strHeaders.put(entry.getKey(), (String) entry.getValue());
        }
      }
      SpanContext spanContext = tracer
          .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(strHeaders));

    try( Scope activeScope = tracer.buildSpan("new_span").asChildOf(spanContext).startActive(true)) {
      ResponseEntity response = restTemplate.getForEntity(restEndPoint, Void.class);
    }
}

@timtebeek
Copy link
Author

Thanks for the quick reply; While that could in theory work; it seems quite cumbersome to do that for all of my KafkaListeners across all of my services, just to be able to pass tracing data from kafka to rest. Is there another way? Some hooks within opentracing-kafka-spring maybe?

@yurishkuro
Copy link

yurishkuro commented Sep 9, 2018

I am also curious why this wasn't done automatically when invoking listeners annotated with @KafkaListener. I am not too familiar with Spring internals, does it provide an easy way to decorate the listeners?

Btw, the extract example above seems incorrect, because it will extract the span context stored in the message by the producer. The consumer already read that context and created another span that is also serialized back into the message with something like second_span_ header prefixes. It's that second span that you want to use as the base for the listener. Also, it should be using follows-from reference, because the consumer span is already finished.

@malafeev
Copy link
Contributor

malafeev commented Sep 10, 2018

does it provide an easy way to decorate the listeners?

There is no easy way, at least not figured out yet.

extract example above seems incorrect

Sure, but there is no solution. It's more like possible direction to try if you know how it works.

@yurishkuro
Copy link

there is an extract adapter class that could be used with second=true argument, but its constructor is not public. I had to write my own adapter, because the example above creates the span representing the handler as descending directly from producer span, which is logically incorrect (it should reference the consumer span).

@malafeev
Copy link
Contributor

@yurishkuro could you submit PR if it can be useful for others?

@safetytrick
Copy link

Both the consumer and producer in this library are a bit off, they open and close a span immediately, this is more forgiveable in the case of the producer as writing to kafka isn't particularily interesting and this does inject the SpanContext into the kafka headers (but if a span is being created around the message it should track some work). In the case of the consumer it is the expectation that the span cover the processing of a message but the kafka batch based apis don't really make this very simple to implement as a library. The message listener container apis for spring seem like they could do this.

@eugene-khyst
Copy link

eugene-khyst commented Mar 18, 2020

Is there a nice solution already?

There is a good article with a workaround idea: https://zhaohuabing.com/post/2019-07-02-using-opentracing-with-istio-english/

Will the workaround based on AOP (aspect around methods with @KafkaListener) properly work or are there pitfalls?

@Aspect
@RequiredArgsConstructor
public class KafkaListenerTracingAspect {

  private final Tracer tracer;

  @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)")
  public void aroundAdvice(ProceedingJoinPoint jp) throws Throwable {
    ConsumerRecord record = (ConsumerRecord) jp.getArgs()[0];
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Span span = tracer
        .buildSpan(jp.getTarget().getClass().getName())
        .asChildOf(spanContext)
        .start();
    tracer.activateSpan(span);
    jp.proceed();
    span.finish();
  }
}

@orange-buffalo
Copy link
Contributor

It makes sense to have a look at Spring Sleuth implementation: https://github.com/spring-cloud/spring-cloud-sleuth/blob/e6ebce4505097aacff72a97481ab571384e08055/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java#L140. It is easily adaptable to opentracing-kafka-client instead of brave tracing.

orange-buffalo pushed a commit to orange-buffalo/java-kafka-client that referenced this issue Oct 27, 2020
orange-buffalo added a commit to orange-buffalo/java-kafka-client that referenced this issue Oct 27, 2020
orange-buffalo added a commit to orange-buffalo/java-kafka-client that referenced this issue Oct 27, 2020
@orange-buffalo
Copy link
Contributor

orange-buffalo commented Oct 30, 2020

This is fixed in version 0.1.15: @KafkaListener is now supported without workarounds and custom code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants