Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Sleuth Correlating Messages #96

Closed
englishbreeze opened this issue Jan 6, 2016 · 15 comments
Closed

Spring Sleuth Correlating Messages #96

englishbreeze opened this issue Jan 6, 2016 · 15 comments

Comments

@englishbreeze
Copy link

Hi,
I have a service that sends messages to another service and would like to correlate them together. The problem is that Spring Sleuth does not correlate the two services together (i.e. Set the parent trace id of the receiver to the trace id of the sender) I'm using Spring Integration MessageChannel to send message from one service to another. Here is my channel configuration for the outbound channel:

    @Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("myQueue"); // default exchange - route to queue
                                    // 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

sending the Message using this code:
MessagingTemplate template = new MessagingTemplate();
template.send(channel, new GenericMessage("blahblah"));

Here is my channel configuration for the input channel:

    @Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("myQueue");
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };

Can you guys let me know how I'm suppose to correlate the two services using Messaging?

@marcingrzejszczak
Copy link
Contributor

marcingrzejszczak commented Jan 7, 2016 via email

@marcingrzejszczak
Copy link
Contributor

marcingrzejszczak commented Jan 7, 2016 via email

@gauravrmazra
Copy link
Contributor

The problem is interceptors which traces between services is not set in case of messaging. In your channel (DirectChannel) set following interceptor.
TraceChannelInterceptor

In case you want to shift to ExecutorChannel in future, then along with above interceptor set this as well
TraceContextPropagationChannelInterceptor

@marcingrzejszczak
Copy link
Contributor

@gauravrmazra thank you for your feedback and support. That's highly appreciated.

However the thing is we want users not to need to add any interceptors manually. We would like them use approaches that will set things automatically for them.

My example is exactly showing that case.

First enable binding (@englishbreeze - you'll have one binding on the sender and one on the receiver side - in this example I have both in one place)

https://github.com/spring-cloud-samples/brewery/blob/master/brewing/src/main/java/io/spring/cloud/samples/brewery/Application.java#L18

Then use the approaches mentioned in my comment below.

Using Spring Cloud Stream is the suggested way of making applications talk to each other indirectly via messaging.

@gauravrmazra
Copy link
Contributor

@marcingrzejszczak Agreed.
But at times, user will be writing their in their own way using messaging components (as OP). May be using spring-integration-java-dsl for integration. In that case, It will be nice if they know what they are missing in their component for tracing.

My experience was little different when I was evaluating this for Stomp messages. It took 2-3 days of effort for understanding that SimpMessagingTemplate(even created by Spring for you) doesn't have ChannelInterceptors injected. You need to extend AbstractWebSocketMessageBrokerConfigurer and overrider configureMessageBroker method and need to add interceptors yourself in brokerChannel of MessageBrokerRegistry.

Long story short, at times you can't reply on what Spring sets up for you by default. User need to be aware of what part is missing and how they can relying on tracing if they are configuring components themselves.

@marcingrzejszczak
Copy link
Contributor

@gauravrmazra you are right but I guess what @englishbreeze was asking about is how to do things properly.

3b6b49f - I think that this proves that it's enough to register MessagingTemplate as a bean and Sleuth's autowiring will work properly in terms of intercepting channels. If that's not the case then we'll have to fix it so it does work out of the box

We can't support all possible scenarios of manual creation of things. But thanks for your support @gauravrmazra . We really appreciate it!!

@englishbreeze
Copy link
Author

Thank you for the reply @marcingrzejszczak and @gauravrmazra . If I wanted to use AMQP to send messages from one service to another via some broker (e.g. RabbitMQ), how do I set it up such that spring sleuth will correlate the sender and the receiver. I tried @gauravrmazra 's suggestion of adding TraceChannelInterptor in my DirectChannel bean but it still does not correlate them together.

@englishbreeze
Copy link
Author

Thanks for your help @marcingrzejszczak . Do you know how to resolve this error?

org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class [com.Application]; nested exception is java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.sleuth.stream.SleuthStreamAutoConfiguration
at org.springframework.context.annotation.ConfigurationClassParser.processImports(ConfigurationClassParser.java:518) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassParser.processDeferredImportSelectors(ConfigurationClassParser.java:454) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:185) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:321) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:243) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:273) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:98) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:678) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:520) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118) ~[spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:764) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.SpringApplication.doRun(SpringApplication.java:357) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:305) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1124) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1113) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at com.fincad.simpleProxy.Application.main(Application.java:37) [classes/:na]
Caused by: java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.sleuth.stream.SleuthStreamAutoConfiguration
at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:64) ~[spring-boot-autoconfigure-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:102) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:203) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.context.annotation.ConfigurationClassParser.processImports(ConfigurationClassParser.java:509) ~[spring-context-4.2.4.RELEASE.jar:4.2.4.RELEASE]
... 15 common frames omitted
Caused by: java.lang.IllegalStateException: The name or value attribute of @ConditionalOnProperty must be specified
at org.springframework.util.Assert.state(Assert.java:392) ~[spring-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnPropertyCondition.getNames(OnPropertyCondition.java:101) ~[spring-boot-autoconfigure-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnPropertyCondition.getMatchOutcome(OnPropertyCondition.java:55) ~[spring-boot-autoconfigure-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47) ~[spring-boot-autoconfigure-1.3.1.RELEASE.jar:1.3.1.RELEASE]
... 18 common frames omitted

@marcingrzejszczak
Copy link
Contributor

Hmm at the first glance it seems like a mismatch in versions of Spring, Spring Boot and Sleuth - can we close this issue and move to Gitter with further explanations?

@englishbreeze
Copy link
Author

Yes

dsyer pushed a commit that referenced this issue Jan 26, 2016
Now that the span data is stored in a header it is safe to remove
the slightly clunky propagation implentation that used a subclass.

Also removed the Stomp* features because they were diverging from
the mainstream integration support and no-one seems to understand
why they are needed. If the original author of #96 can explain
why they were needed we can ask for a new PR to re-instate a version
that works with the new model.
@gauravrmazra
Copy link
Contributor

@dsyer

TraceChannelInterceptor can't trace Stomp protocol based messages sent using Spring. It has different message headers[normal headers + native headers] than the usual JMS message.

use case: STOMP messages are usually used to pass data over websocket or any message broker (JMS, ActiveMQ etc.) can also be used for it.

e.g. https://spring.io/guides/gs/messaging-stomp-websocket/
e.g. https://dzone.com/articles/easy-messaging-stomp-over

@dsyer
Copy link
Contributor

dsyer commented Feb 8, 2016

There is nothing in Sleuth specific to JMS: it is all using the spring messaging APIs (which is the same as the STOMP support in Spring). So I don't really understand the issue yet.

@gauravrmazra
Copy link
Contributor

JMS and STOMP message use different message builder. They have different header accessors and different type of MessageHeaders.
e.g. JMS message Builder => org.springframework.integration.support.MessageBuilder
STOMP message builder => org.springframework.messaging.support.MessageBuilder

jms header accessor => IntegrationMessageHeaderAccessor
STOMP header accessor => org.springframework.messaging.support.MessageHeaderAccessor.MessageHeaderAccessor(Message<?>)

@dsyer
Copy link
Contributor

dsyer commented Feb 8, 2016

Yes, but a) it's not JMS, it's Spring Integration and b) they both use the same core API for Message, MessageChannel, interceptors etc. so I still think we should be able to use the same interceptor.

In any case can we stop discussing this here, in an unrelated, closed issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants