diff --git a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbitTracing.java b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbitTracing.java index 9c1a66a0eb..6fd55de547 100644 --- a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbitTracing.java +++ b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/ITSpringRabbitTracing.java @@ -5,7 +5,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -37,12 +39,12 @@ import zipkin2.DependencyLink; import zipkin2.Span; import zipkin2.internal.DependencyLinker; -import zipkin2.reporter.Reporter; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; import static org.assertj.core.groups.Tuple.tuple; import static zipkin2.Span.Kind.CONSUMER; +import static zipkin2.Span.Kind.PRODUCER; public class ITSpringRabbitTracing { @@ -64,15 +66,18 @@ public class ITSpringRabbitTracing { testFixture.produceMessage(); testFixture.awaitMessageConsumed(); - assertThat(testFixture.producerSpans).hasSize(1); - assertThat(testFixture.consumerSpans).hasSize(2); + List allSpans = new ArrayList<>(); + allSpans.add(testFixture.producerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); - String originatingTraceId = testFixture.producerSpans.get(0).traceId(); - String consumerSpanId = testFixture.consumerSpans.get(0).id(); + String originatingTraceId = allSpans.get(0).traceId(); + String consumerSpanId = allSpans.get(1).id(); - assertThat(testFixture.consumerSpans) + assertThat(allSpans) .extracting(Span::kind, Span::traceId, Span::parentId) .containsExactly( + tuple(PRODUCER, originatingTraceId, null), tuple(CONSUMER, originatingTraceId, originatingTraceId), tuple(null, originatingTraceId, consumerSpanId) ); @@ -91,9 +96,11 @@ public class ITSpringRabbitTracing { testFixture.produceMessage(); testFixture.awaitMessageConsumed(); - assertThat(testFixture.consumerSpans).hasSize(2); + List consumerSpans = new ArrayList<>(); + consumerSpans.add(testFixture.consumerSpans.take()); + consumerSpans.add(testFixture.consumerSpans.take()); - assertThat(testFixture.consumerSpans) + assertThat(consumerSpans) .filteredOn(s -> s.kind() == CONSUMER) .flatExtracting(s -> s.tags().entrySet()) .containsOnly( @@ -102,7 +109,7 @@ public class ITSpringRabbitTracing { entry("rabbit.queue", "test-queue") ); - assertThat(testFixture.consumerSpans) + assertThat(consumerSpans) .filteredOn(s -> s.kind() != CONSUMER) .flatExtracting(s -> s.tags().entrySet()) .isEmpty(); @@ -113,7 +120,7 @@ public class ITSpringRabbitTracing { testFixture.produceMessage(); testFixture.awaitMessageConsumed(); - Span span1 = testFixture.consumerSpans.get(0), span2 = testFixture.consumerSpans.get(1); + Span span1 = testFixture.consumerSpans.take(), span2 = testFixture.consumerSpans.take(); Span consumerSpan = span1.kind() == Span.Kind.CONSUMER ? span1 : span2; Span listenerSpan = consumerSpan == span1 ? span2 : span1; @@ -126,8 +133,9 @@ public class ITSpringRabbitTracing { testFixture.awaitMessageConsumed(); List allSpans = new ArrayList<>(); - allSpans.addAll(testFixture.consumerSpans); - allSpans.addAll(testFixture.producerSpans); + allSpans.add(testFixture.producerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); List links = new DependencyLinker().putTrace(allSpans.iterator()).link(); assertThat(links).extracting("parent", "child").containsExactly( @@ -140,9 +148,11 @@ public class ITSpringRabbitTracing { testFixture.produceMessageFromDefault(); testFixture.awaitMessageConsumed(); - assertThat(testFixture.consumerSpans).hasSize(2); + List consumerSpans = new ArrayList<>(); + consumerSpans.add(testFixture.producerSpans.take()); + consumerSpans.add(testFixture.consumerSpans.take()); - assertThat(testFixture.consumerSpans) + assertThat(consumerSpans) .filteredOn(s -> s.kind() == CONSUMER) .flatExtracting(s -> s.tags().entrySet()) .containsOnly( @@ -151,7 +161,7 @@ public class ITSpringRabbitTracing { entry("rabbit.queue", "test-queue") ); - assertThat(testFixture.consumerSpans) + assertThat(consumerSpans) .filteredOn(s -> s.kind() != CONSUMER) .flatExtracting(s -> s.tags().entrySet()) .isEmpty(); @@ -162,11 +172,14 @@ public class ITSpringRabbitTracing { testFixture.produceMessage(); testFixture.awaitMessageConsumed(); - assertThat(testFixture.consumerSpans).hasSize(2); + List allSpans = new ArrayList<>(); + allSpans.add(testFixture.producerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); + allSpans.add(testFixture.consumerSpans.take()); - assertThat(testFixture.consumerSpans) + assertThat(allSpans) .extracting(Span::name) - .containsExactly("next-message", "on-message"); + .containsExactly("publish", "next-message", "on-message"); } @Configuration @@ -200,11 +213,11 @@ public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { @Configuration public static class RabbitProducerConfig { @Bean - public Tracing tracing(Reporter reporter) { + public Tracing tracing(BlockingQueue producerSpans) { return Tracing.newBuilder() .localServiceName("spring-amqp-producer") .sampler(Sampler.ALWAYS_SAMPLE) - .spanReporter(reporter) + .spanReporter(producerSpans::add) .build(); } @@ -214,13 +227,8 @@ public SpringRabbitTracing springRabbitTracing(Tracing tracing) { } @Bean - public Reporter reporter(List spans) { - return spans::add; - } - - @Bean - public List producerSpans() { - return new ArrayList<>(); + public BlockingQueue producerSpans() { + return new LinkedBlockingQueue<>(); } @Bean @@ -262,11 +270,11 @@ public HelloWorldProducer tracingRabbitProducer_decorate( @Configuration public static class RabbitConsumerConfig { @Bean - public Tracing tracing(List spans) { + public Tracing tracing(BlockingQueue consumerSpans) { return Tracing.newBuilder() .localServiceName("spring-amqp-consumer") .sampler(Sampler.ALWAYS_SAMPLE) - .spanReporter(spans::add) + .spanReporter(consumerSpans::add) .build(); } @@ -276,8 +284,8 @@ public SpringRabbitTracing springRabbitTracing(Tracing tracing) { } @Bean - public List consumerSpans() { - return new ArrayList<>(); + public BlockingQueue consumerSpans() { + return new LinkedBlockingQueue<>(); } @Bean @@ -285,11 +293,11 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory, SpringRabbitTracing springRabbitTracing ) { - SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory(); - simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory); + rabbitListenerContainerFactory.setConnectionFactory(connectionFactory); return springRabbitTracing.decorateSimpleRabbitListenerContainerFactory( - simpleRabbitListenerContainerFactory + rabbitListenerContainerFactory ); } @@ -340,17 +348,16 @@ public CountDownLatch getCountDownLatch() { } private static class ITSpringAmqpTracingTestFixture { - - private ApplicationContext producerContext; - private ApplicationContext consumerContext; - private List producerSpans; - private List consumerSpans; + ApplicationContext producerContext; + ApplicationContext consumerContext; + BlockingQueue producerSpans; + BlockingQueue consumerSpans; ITSpringAmqpTracingTestFixture() { producerContext = producerSpringContext(); consumerContext = consumerSpringContext(); - producerSpans = (List) producerContext.getBean("producerSpans"); - consumerSpans = (List) consumerContext.getBean("consumerSpans"); + producerSpans = (BlockingQueue) producerContext.getBean("producerSpans"); + consumerSpans = (BlockingQueue) consumerContext.getBean("consumerSpans"); } private void reset() {