Skip to content

Commit

Permalink
SQS Messaging Autoconfig (#1218)
Browse files Browse the repository at this point in the history
added support for SQS messaging tracing
  • Loading branch information
devinsba authored and marcingrzejszczak committed Oct 3, 2019
1 parent c7b8187 commit dc82c5c
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 16 deletions.
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/spring-cloud-sleuth.adoc
Expand Up @@ -1381,6 +1381,13 @@ To block this feature, set `spring.sleuth.messaging.jms.enabled` to `false`.

IMPORTANT: We don't support baggage propagation for JMS

==== Spring Cloud AWS Messaging SQS

We instrument `@SqsListener` which is provided by `org.springframework.cloud:spring-cloud-aws-messaging`
so that tracing headers get extracted from the message and a trace gets put into the context.

To block this feature, set `spring.sleuth.messaging.sqs.enabled` to `false`.

=== Zuul

We instrument the Zuul Ribbon integration by enriching the Ribbon requests with tracing information.
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Expand Up @@ -158,6 +158,13 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gateway-dependencies</artifactId>
Expand Down Expand Up @@ -250,8 +257,8 @@
</spring-cloud-openfeign.version>
<spring-security-boot-autoconfigure.version>2.1.7.RELEASE
</spring-security-boot-autoconfigure.version>
<spring-cloud-aws.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-aws.version>
<disable.nohttp.checks>false</disable.nohttp.checks>

<okhttp.version>3.10.0</okhttp.version>
<mockwebserver.version>3.10.0</mockwebserver.version>
<guava.version>20.0</guava.version>
Expand Down
5 changes: 5 additions & 0 deletions spring-cloud-sleuth-core/pom.xml
Expand Up @@ -82,6 +82,11 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
Expand Down
Expand Up @@ -17,12 +17,15 @@
package org.springframework.cloud.sleuth.instrument.messaging;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.jms.JmsTracing;
import brave.kafka.clients.KafkaTracing;
import brave.propagation.Propagation;
import brave.spring.rabbit.SpringRabbitTracing;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
Expand All @@ -49,6 +52,8 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -62,6 +67,11 @@
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

/**
Expand All @@ -73,7 +83,8 @@
*/
@Configuration
@ConditionalOnBean(Tracing.class)
@AutoConfigureAfter({ TraceAutoConfiguration.class })
@AutoConfigureAfter({ TraceAutoConfiguration.class,
TraceSpringMessagingAutoConfiguration.class })
@OnMessagingEnabled
@EnableConfigurationProperties(SleuthMessagingProperties.class)
public class TraceMessagingAutoConfiguration {
Expand Down Expand Up @@ -188,6 +199,28 @@ TracingJmsBeanPostProcessor tracingJmsBeanPostProcessor(BeanFactory beanFactory)

}

@Configuration
@ConditionalOnProperty(value = "spring.sleuth.messaging.sqs.enabled",
matchIfMissing = true)
@ConditionalOnClass(QueueMessageHandler.class)
protected static class SleuthSqsConfiguration {

@Bean
TracingMethodMessageHandlerAdapter tracingMethodMessageHandlerAdapter(
Tracing tracing,
Propagation.Getter<MessageHeaderAccessor, String> traceMessagePropagationGetter) {
return new TracingMethodMessageHandlerAdapter(tracing,
traceMessagePropagationGetter);
}

@Bean
QueueMessageHandlerFactory sqsQueueMessageHandlerFactory(
TracingMethodMessageHandlerAdapter tracingMethodMessageHandlerAdapter) {
return new SqsQueueMessageHandlerFactory(tracingMethodMessageHandlerAdapter);
}

}

}

class SleuthRabbitBeanPostProcessor implements BeanPostProcessor {
Expand Down Expand Up @@ -422,3 +455,50 @@ Object sleuthDefaultKafkaHeaderMapper(Object bean) {
}

}

class SqsQueueMessageHandlerFactory extends QueueMessageHandlerFactory {

private TracingMethodMessageHandlerAdapter handlerAdapter;

SqsQueueMessageHandlerFactory(TracingMethodMessageHandlerAdapter handlerAdapter) {
this.handlerAdapter = handlerAdapter;
}

@Override
public QueueMessageHandler createQueueMessageHandler() {
if (CollectionUtils.isEmpty(getMessageConverters())) {
return new SqsQueueMessageHandler(handlerAdapter, Collections.emptyList());
}
return new SqsQueueMessageHandler(handlerAdapter, getMessageConverters());
}

}

class SqsQueueMessageHandler extends QueueMessageHandler {

// copied from QueueMessageHandler
private static final String LOGICAL_RESOURCE_ID = "LogicalResourceId";

private TracingMethodMessageHandlerAdapter handlerAdapter;

SqsQueueMessageHandler(TracingMethodMessageHandlerAdapter handlerAdapter,
List<MessageConverter> messageConverters) {
super(messageConverters);
this.handlerAdapter = handlerAdapter;
}

@Override
public void handleMessage(Message<?> message) throws MessagingException {
handlerAdapter.wrapMethodMessageHandler(message, super::handleMessage,
this::messageSpanTagger);
}

private void messageSpanTagger(Span span, Message<?> message) {
span.remoteServiceName("sqs");
if (message.getHeaders().get(LOGICAL_RESOURCE_ID) != null) {
span.tag("sqs.queue_url",
message.getHeaders().get(LOGICAL_RESOURCE_ID).toString());
}
}

}
Expand Up @@ -22,7 +22,6 @@
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
Expand All @@ -44,7 +43,8 @@
@Configuration
@ConditionalOnClass(GlobalChannelInterceptor.class)
@ConditionalOnBean(Tracing.class)
@AutoConfigureAfter({ TraceAutoConfiguration.class })
@AutoConfigureAfter({ TraceAutoConfiguration.class,
TraceSpringMessagingAutoConfiguration.class })
@OnMessagingEnabled
@ConditionalOnProperty(value = "spring.sleuth.integration.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SleuthMessagingProperties.class)
Expand All @@ -67,16 +67,4 @@ TracingChannelInterceptor traceChannelInterceptor(Tracing tracing,
traceMessagePropagationGetter);
}

@Bean
@ConditionalOnMissingBean
Propagation.Setter<MessageHeaderAccessor, String> traceMessagePropagationSetter() {
return MessageHeaderPropagation.INSTANCE;
}

@Bean
@ConditionalOnMissingBean
Propagation.Getter<MessageHeaderAccessor, String> traceMessagePropagationGetter() {
return MessageHeaderPropagation.INSTANCE;
}

}
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.instrument.messaging;

import brave.propagation.Propagation;

import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.support.MessageHeaderAccessor;

@Configuration
@ConditionalOnClass(MessageHeaderAccessor.class)
@OnMessagingEnabled
@EnableConfigurationProperties(SleuthMessagingProperties.class)
class TraceSpringMessagingAutoConfiguration {

@Bean
@ConditionalOnMissingBean
Propagation.Setter<MessageHeaderAccessor, String> traceMessagePropagationSetter() {
return MessageHeaderPropagation.INSTANCE;
}

@Bean
@ConditionalOnMissingBean
Propagation.Getter<MessageHeaderAccessor, String> traceMessagePropagationGetter() {
return MessageHeaderPropagation.INSTANCE;
}

}
@@ -0,0 +1,109 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.instrument.messaging;

import java.util.function.BiConsumer;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageHeaderAccessor;

import static brave.Span.Kind.CONSUMER;

/**
* Adds tracing extraction to an instance of
* {@link org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler}
* in a reusable way. When sub-classing a provider specific class of that type you would
* wrap the <pre>super.handleMessage(...)</pre> call with a call to this. See
* {@link org.springframework.cloud.sleuth.instrument.messaging.SqsQueueMessageHandler}
* for an example.
*
* This implementation also allows for supplying a {@link java.util.function.BiConsumer}
* instance that can be used to add queue specific tags and modifications to the span.
*
* @author Brian Devins-Suresh
*/
class TracingMethodMessageHandlerAdapter {

private Tracing tracing;

private Tracer tracer;

private TraceContext.Extractor<MessageHeaderAccessor> extractor;

TracingMethodMessageHandlerAdapter(Tracing tracing,
Propagation.Getter<MessageHeaderAccessor, String> traceMessagePropagationGetter) {
this.tracing = tracing;
this.tracer = tracing.tracer();
this.extractor = tracing.propagation().extractor(traceMessagePropagationGetter);
}

void wrapMethodMessageHandler(Message<?> message, MessageHandler messageHandler,
BiConsumer<Span, Message<?>> messageSpanTagger) {
TraceContextOrSamplingFlags extracted = extractAndClearHeaders(message);

Span consumerSpan = tracer.nextSpan(extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());

if (!consumerSpan.isNoop()) {
consumerSpan.name("next-message").kind(CONSUMER);
if (messageSpanTagger != null) {
messageSpanTagger.accept(consumerSpan, message);
}

// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context())
.currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);

// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}

try (Tracer.SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
messageHandler.handleMessage(message);
}
catch (Throwable t) {
listenerSpan.error(t);
throw t;
}
finally {
listenerSpan.finish();
}
}

private TraceContextOrSamplingFlags extractAndClearHeaders(Message<?> message) {
MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message);
TraceContextOrSamplingFlags extracted = extractor.extract(headers);

for (String propagationKey : tracing.propagation().keys()) {
headers.removeHeader(propagationKey);
}

return extracted;
}

}
Expand Up @@ -24,10 +24,12 @@ org.springframework.cloud.sleuth.instrument.grpc.TraceGrpcAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.messaging.SleuthKafkaStreamsConfiguration,\
org.springframework.cloud.sleuth.instrument.messaging.TraceMessagingAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.messaging.TraceSpringIntegrationAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.messaging.TraceSpringMessagingAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.messaging.websocket.TraceWebSocketAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.opentracing.OpentracingAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.redis.TraceRedisAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.quartz.TraceQuartzAutoConfiguration

# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor

0 comments on commit dc82c5c

Please sign in to comment.