From dc82c5cafcd30708df90681c13fcde1bfd0b09da Mon Sep 17 00:00:00 2001 From: Brian Devins-Suresh Date: Thu, 3 Oct 2019 06:18:32 -0400 Subject: [PATCH] SQS Messaging Autoconfig (#1218) added support for SQS messaging tracing --- .../main/asciidoc/spring-cloud-sleuth.adoc | 7 + pom.xml | 9 +- spring-cloud-sleuth-core/pom.xml | 5 + .../TraceMessagingAutoConfiguration.java | 82 ++++++++++- ...aceSpringIntegrationAutoConfiguration.java | 16 +-- ...TraceSpringMessagingAutoConfiguration.java | 46 ++++++ .../TracingMethodMessageHandlerAdapter.java | 109 ++++++++++++++ .../main/resources/META-INF/spring.factories | 2 + ...acingMethodMessageHandlerAdapterTests.java | 135 ++++++++++++++++++ .../SqsQueueMessageHandlerTests.java | 73 ++++++++++ 10 files changed, 468 insertions(+), 16 deletions(-) create mode 100644 spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringMessagingAutoConfiguration.java create mode 100644 spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TracingMethodMessageHandlerAdapter.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/ITTracingMethodMessageHandlerAdapterTests.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/SqsQueueMessageHandlerTests.java diff --git a/docs/src/main/asciidoc/spring-cloud-sleuth.adoc b/docs/src/main/asciidoc/spring-cloud-sleuth.adoc index c2c7fa3710..d54c01ed0e 100644 --- a/docs/src/main/asciidoc/spring-cloud-sleuth.adoc +++ b/docs/src/main/asciidoc/spring-cloud-sleuth.adoc @@ -1381,6 +1381,13 @@ To block this feature, set `spring.sleuth.messaging.jms.enabled` to `false`. IMPORTANT: We don't support baggage propagation for JMS +==== Spring Cloud AWS Messaging SQS + +We instrument `@SqsListener` which is provided by `org.springframework.cloud:spring-cloud-aws-messaging` +so that tracing headers get extracted from the message and a trace gets put into the context. + +To block this feature, set `spring.sleuth.messaging.sqs.enabled` to `false`. + === Zuul We instrument the Zuul Ribbon integration by enriching the Ribbon requests with tracing information. diff --git a/pom.xml b/pom.xml index 4fcdedd0c4..f8f02110ab 100644 --- a/pom.xml +++ b/pom.xml @@ -158,6 +158,13 @@ pom import + + org.springframework.cloud + spring-cloud-aws-dependencies + ${spring-cloud-aws.version} + pom + import + org.springframework.cloud spring-cloud-gateway-dependencies @@ -250,8 +257,8 @@ 2.1.7.RELEASE + 2.2.0.BUILD-SNAPSHOT false - 3.10.0 3.10.0 20.0 diff --git a/spring-cloud-sleuth-core/pom.xml b/spring-cloud-sleuth-core/pom.xml index 8f92944795..36d965bf92 100644 --- a/spring-cloud-sleuth-core/pom.xml +++ b/spring-cloud-sleuth-core/pom.xml @@ -82,6 +82,11 @@ org.springframework.cloud spring-cloud-commons + + org.springframework.cloud + spring-cloud-aws-messaging + true + org.springframework.cloud spring-cloud-starter-gateway diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java index bab5700b3b..fe7835dfa3 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java @@ -17,12 +17,15 @@ package org.springframework.cloud.sleuth.instrument.messaging; import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; import brave.Span; import brave.Tracer; import brave.Tracing; import brave.jms.JmsTracing; import brave.kafka.clients.KafkaTracing; +import brave.propagation.Propagation; import brave.spring.rabbit.SpringRabbitTracing; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -49,6 +52,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory; +import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler; import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -62,6 +67,11 @@ import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.util.CollectionUtils; import org.springframework.util.ReflectionUtils; /** @@ -73,7 +83,8 @@ */ @Configuration @ConditionalOnBean(Tracing.class) -@AutoConfigureAfter({ TraceAutoConfiguration.class }) +@AutoConfigureAfter({ TraceAutoConfiguration.class, + TraceSpringMessagingAutoConfiguration.class }) @OnMessagingEnabled @EnableConfigurationProperties(SleuthMessagingProperties.class) public class TraceMessagingAutoConfiguration { @@ -188,6 +199,28 @@ TracingJmsBeanPostProcessor tracingJmsBeanPostProcessor(BeanFactory beanFactory) } + @Configuration + @ConditionalOnProperty(value = "spring.sleuth.messaging.sqs.enabled", + matchIfMissing = true) + @ConditionalOnClass(QueueMessageHandler.class) + protected static class SleuthSqsConfiguration { + + @Bean + TracingMethodMessageHandlerAdapter tracingMethodMessageHandlerAdapter( + Tracing tracing, + Propagation.Getter traceMessagePropagationGetter) { + return new TracingMethodMessageHandlerAdapter(tracing, + traceMessagePropagationGetter); + } + + @Bean + QueueMessageHandlerFactory sqsQueueMessageHandlerFactory( + TracingMethodMessageHandlerAdapter tracingMethodMessageHandlerAdapter) { + return new SqsQueueMessageHandlerFactory(tracingMethodMessageHandlerAdapter); + } + + } + } class SleuthRabbitBeanPostProcessor implements BeanPostProcessor { @@ -422,3 +455,50 @@ Object sleuthDefaultKafkaHeaderMapper(Object bean) { } } + +class SqsQueueMessageHandlerFactory extends QueueMessageHandlerFactory { + + private TracingMethodMessageHandlerAdapter handlerAdapter; + + SqsQueueMessageHandlerFactory(TracingMethodMessageHandlerAdapter handlerAdapter) { + this.handlerAdapter = handlerAdapter; + } + + @Override + public QueueMessageHandler createQueueMessageHandler() { + if (CollectionUtils.isEmpty(getMessageConverters())) { + return new SqsQueueMessageHandler(handlerAdapter, Collections.emptyList()); + } + return new SqsQueueMessageHandler(handlerAdapter, getMessageConverters()); + } + +} + +class SqsQueueMessageHandler extends QueueMessageHandler { + + // copied from QueueMessageHandler + private static final String LOGICAL_RESOURCE_ID = "LogicalResourceId"; + + private TracingMethodMessageHandlerAdapter handlerAdapter; + + SqsQueueMessageHandler(TracingMethodMessageHandlerAdapter handlerAdapter, + List messageConverters) { + super(messageConverters); + this.handlerAdapter = handlerAdapter; + } + + @Override + public void handleMessage(Message message) throws MessagingException { + handlerAdapter.wrapMethodMessageHandler(message, super::handleMessage, + this::messageSpanTagger); + } + + private void messageSpanTagger(Span span, Message message) { + span.remoteServiceName("sqs"); + if (message.getHeaders().get(LOGICAL_RESOURCE_ID) != null) { + span.tag("sqs.queue_url", + message.getHeaders().get(LOGICAL_RESOURCE_ID).toString()); + } + } + +} diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringIntegrationAutoConfiguration.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringIntegrationAutoConfiguration.java index 4071e9fe1c..cadf8677a6 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringIntegrationAutoConfiguration.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringIntegrationAutoConfiguration.java @@ -22,7 +22,6 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration; @@ -44,7 +43,8 @@ @Configuration @ConditionalOnClass(GlobalChannelInterceptor.class) @ConditionalOnBean(Tracing.class) -@AutoConfigureAfter({ TraceAutoConfiguration.class }) +@AutoConfigureAfter({ TraceAutoConfiguration.class, + TraceSpringMessagingAutoConfiguration.class }) @OnMessagingEnabled @ConditionalOnProperty(value = "spring.sleuth.integration.enabled", matchIfMissing = true) @EnableConfigurationProperties(SleuthMessagingProperties.class) @@ -67,16 +67,4 @@ TracingChannelInterceptor traceChannelInterceptor(Tracing tracing, traceMessagePropagationGetter); } - @Bean - @ConditionalOnMissingBean - Propagation.Setter traceMessagePropagationSetter() { - return MessageHeaderPropagation.INSTANCE; - } - - @Bean - @ConditionalOnMissingBean - Propagation.Getter traceMessagePropagationGetter() { - return MessageHeaderPropagation.INSTANCE; - } - } diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringMessagingAutoConfiguration.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringMessagingAutoConfiguration.java new file mode 100644 index 0000000000..20ff81445e --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceSpringMessagingAutoConfiguration.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging; + +import brave.propagation.Propagation; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.support.MessageHeaderAccessor; + +@Configuration +@ConditionalOnClass(MessageHeaderAccessor.class) +@OnMessagingEnabled +@EnableConfigurationProperties(SleuthMessagingProperties.class) +class TraceSpringMessagingAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + Propagation.Setter traceMessagePropagationSetter() { + return MessageHeaderPropagation.INSTANCE; + } + + @Bean + @ConditionalOnMissingBean + Propagation.Getter traceMessagePropagationGetter() { + return MessageHeaderPropagation.INSTANCE; + } + +} diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TracingMethodMessageHandlerAdapter.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TracingMethodMessageHandlerAdapter.java new file mode 100644 index 0000000000..e3defb22a6 --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TracingMethodMessageHandlerAdapter.java @@ -0,0 +1,109 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging; + +import java.util.function.BiConsumer; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.propagation.Propagation; +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.MessageHeaderAccessor; + +import static brave.Span.Kind.CONSUMER; + +/** + * Adds tracing extraction to an instance of + * {@link org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler} + * in a reusable way. When sub-classing a provider specific class of that type you would + * wrap the
super.handleMessage(...)
call with a call to this. See + * {@link org.springframework.cloud.sleuth.instrument.messaging.SqsQueueMessageHandler} + * for an example. + * + * This implementation also allows for supplying a {@link java.util.function.BiConsumer} + * instance that can be used to add queue specific tags and modifications to the span. + * + * @author Brian Devins-Suresh + */ +class TracingMethodMessageHandlerAdapter { + + private Tracing tracing; + + private Tracer tracer; + + private TraceContext.Extractor extractor; + + TracingMethodMessageHandlerAdapter(Tracing tracing, + Propagation.Getter traceMessagePropagationGetter) { + this.tracing = tracing; + this.tracer = tracing.tracer(); + this.extractor = tracing.propagation().extractor(traceMessagePropagationGetter); + } + + void wrapMethodMessageHandler(Message message, MessageHandler messageHandler, + BiConsumer> messageSpanTagger) { + TraceContextOrSamplingFlags extracted = extractAndClearHeaders(message); + + Span consumerSpan = tracer.nextSpan(extracted); + Span listenerSpan = tracer.newChild(consumerSpan.context()); + + if (!consumerSpan.isNoop()) { + consumerSpan.name("next-message").kind(CONSUMER); + if (messageSpanTagger != null) { + messageSpanTagger.accept(consumerSpan, message); + } + + // incur timestamp overhead only once + long timestamp = tracing.clock(consumerSpan.context()) + .currentTimeMicroseconds(); + consumerSpan.start(timestamp); + long consumerFinish = timestamp + 1L; // save a clock reading + consumerSpan.finish(consumerFinish); + + // not using scoped span as we want to start with a pre-configured time + listenerSpan.name("on-message").start(consumerFinish); + } + + try (Tracer.SpanInScope ws = tracer.withSpanInScope(listenerSpan)) { + messageHandler.handleMessage(message); + } + catch (Throwable t) { + listenerSpan.error(t); + throw t; + } + finally { + listenerSpan.finish(); + } + } + + private TraceContextOrSamplingFlags extractAndClearHeaders(Message message) { + MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message); + TraceContextOrSamplingFlags extracted = extractor.extract(headers); + + for (String propagationKey : tracing.propagation().keys()) { + headers.removeHeader(propagationKey); + } + + return extracted; + } + +} diff --git a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories index a394fdf861..725e2e30ea 100644 --- a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories @@ -24,10 +24,12 @@ org.springframework.cloud.sleuth.instrument.grpc.TraceGrpcAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.messaging.SleuthKafkaStreamsConfiguration,\ org.springframework.cloud.sleuth.instrument.messaging.TraceMessagingAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.messaging.TraceSpringIntegrationAutoConfiguration,\ +org.springframework.cloud.sleuth.instrument.messaging.TraceSpringMessagingAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.messaging.websocket.TraceWebSocketAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.opentracing.OpentracingAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.redis.TraceRedisAutoConfiguration,\ org.springframework.cloud.sleuth.instrument.quartz.TraceQuartzAutoConfiguration + # Environment Post Processor org.springframework.boot.env.EnvironmentPostProcessor=\ org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/ITTracingMethodMessageHandlerAdapterTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/ITTracingMethodMessageHandlerAdapterTests.java new file mode 100644 index 0000000000..42b7e54d3c --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/ITTracingMethodMessageHandlerAdapterTests.java @@ -0,0 +1,135 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +import brave.Span; +import brave.Tracing; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler; +import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; + +@SpringBootTest( + classes = ITTracingMethodMessageHandlerAdapterTests.TestingConfiguration.class, + webEnvironment = NONE) +@RunWith(SpringRunner.class) +public class ITTracingMethodMessageHandlerAdapterTests { + + private static final String TRACE_ID = "12345678123456781234567812345678"; + + private static final String SPAN_ID = "1234567812345678"; + + @Autowired + ApplicationContext applicationContext; + + @Autowired + SqsQueueMessageHandlerFactory messageHandlerFactory; + + @Autowired + TestingMessageHandler testingMessageHandler; + + @Autowired + Tracing tracing; + + private QueueMessageHandler messageHandler; + + @Before + public void setup() { + messageHandler = messageHandlerFactory.createQueueMessageHandler(); + messageHandler.setApplicationContext(applicationContext); + messageHandler.afterPropertiesSet(); + } + + @Test + public void aSpanGetsPutIntoScopeWithoutHeadersOnTheMessage() { + AtomicReference probedSpan = new AtomicReference<>(); + testingMessageHandler.withTestProbe(((headers, s) -> { + probedSpan.set(tracing.tracer().currentSpan()); + })); + + messageHandler.handleMessage(new GenericMessage<>("message", + Collections.singletonMap("LogicalResourceId", "test"))); + + assertThat(probedSpan.get()).isNotNull(); + } + + @Test + public void theSpanThatIsInTheHeadersIsUsedForTheTraceScope() { + AtomicReference probedSpan = new AtomicReference<>(); + testingMessageHandler.withTestProbe(((headers, s) -> { + probedSpan.set(tracing.tracer().currentSpan()); + })); + + Map headers = new HashMap<>(); + headers.put("LogicalResourceId", "test"); + headers.put("X-B3-TraceId", TRACE_ID); + headers.put("X-B3-SpanId", SPAN_ID); + headers.put("X-B3-Sampled", "1"); + messageHandler.handleMessage(new GenericMessage<>("message", headers)); + + assertThat(probedSpan.get()).isNotNull(); + assertThat(probedSpan.get().context().traceIdString()).isEqualTo(TRACE_ID); + assertThat(probedSpan.get().context().sampled()).isTrue(); + } + + @EnableAutoConfiguration + @Configuration + static class TestingConfiguration { + + @Bean + TestingMessageHandler testingMessageHandler() { + return new TestingMessageHandler(); + } + + } + + static class TestingMessageHandler { + + private BiConsumer testProbe; + + void withTestProbe(BiConsumer consumer) { + this.testProbe = consumer; + } + + @SqsListener("test") + public void handle(MessageHeaders header, String payload) { + testProbe.accept(header, payload); + } + + } + +} diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/SqsQueueMessageHandlerTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/SqsQueueMessageHandlerTests.java new file mode 100644 index 0000000000..beff3653da --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/SqsQueueMessageHandlerTests.java @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging; + +import java.util.Collections; +import java.util.function.BiConsumer; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.quality.Strictness; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SqsQueueMessageHandlerTests { + + @Rule + public MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + + @Mock + TracingMethodMessageHandlerAdapter adapter; + + SqsQueueMessageHandler subject; + + @Before + public void setup() { + subject = new SqsQueueMessageHandler(adapter, Collections.emptyList()); + } + + @Test + public void sqsQueueMessageHandlerDelegatesToAdapter() { + ArgumentCaptor messageCapture = ArgumentCaptor.forClass(Message.class); + ArgumentCaptor handlerCapture = ArgumentCaptor + .forClass(MessageHandler.class); + ArgumentCaptor spanTaggerCapture = ArgumentCaptor + .forClass(BiConsumer.class); + Mockito.doNothing().when(adapter).wrapMethodMessageHandler( + messageCapture.capture(), handlerCapture.capture(), + spanTaggerCapture.capture()); + + subject.handleMessage(new GenericMessage<>("a")); + + Mockito.verify(adapter, Mockito.times(1)).wrapMethodMessageHandler(Mockito.any(), + Mockito.any(), Mockito.any()); + assertThat(messageCapture.getValue().getPayload().toString()).isEqualTo("a"); + assertThat(handlerCapture.getValue()).isNotNull(); + assertThat(spanTaggerCapture.getValue()).isNotNull(); + } + +}