Skip to content

Commit

Permalink
Fixes test flake on latest JDK
Browse files Browse the repository at this point in the history
This switches to a blocking queue to solve a race in rabbit tests
  • Loading branch information
Adrian Cole authored and adriancole committed Apr 2, 2018
1 parent 821a158 commit 2eb0d90
Showing 1 changed file with 48 additions and 41 deletions.
Expand Up @@ -5,7 +5,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -37,12 +39,12 @@
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.internal.DependencyLinker;
import zipkin2.reporter.Reporter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.groups.Tuple.tuple;
import static zipkin2.Span.Kind.CONSUMER;
import static zipkin2.Span.Kind.PRODUCER;

public class ITSpringRabbitTracing {

Expand All @@ -64,15 +66,18 @@ public class ITSpringRabbitTracing {
testFixture.produceMessage();
testFixture.awaitMessageConsumed();

assertThat(testFixture.producerSpans).hasSize(1);
assertThat(testFixture.consumerSpans).hasSize(2);
List<Span> allSpans = new ArrayList<>();
allSpans.add(testFixture.producerSpans.take());
allSpans.add(testFixture.consumerSpans.take());
allSpans.add(testFixture.consumerSpans.take());

String originatingTraceId = testFixture.producerSpans.get(0).traceId();
String consumerSpanId = testFixture.consumerSpans.get(0).id();
String originatingTraceId = allSpans.get(0).traceId();
String consumerSpanId = allSpans.get(1).id();

assertThat(testFixture.consumerSpans)
assertThat(allSpans)
.extracting(Span::kind, Span::traceId, Span::parentId)
.containsExactly(
tuple(PRODUCER, originatingTraceId, null),
tuple(CONSUMER, originatingTraceId, originatingTraceId),
tuple(null, originatingTraceId, consumerSpanId)
);
Expand All @@ -91,9 +96,11 @@ public class ITSpringRabbitTracing {
testFixture.produceMessage();
testFixture.awaitMessageConsumed();

assertThat(testFixture.consumerSpans).hasSize(2);
List<Span> consumerSpans = new ArrayList<>();
consumerSpans.add(testFixture.consumerSpans.take());
consumerSpans.add(testFixture.consumerSpans.take());

assertThat(testFixture.consumerSpans)
assertThat(consumerSpans)
.filteredOn(s -> s.kind() == CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(
Expand All @@ -102,7 +109,7 @@ public class ITSpringRabbitTracing {
entry("rabbit.queue", "test-queue")
);

assertThat(testFixture.consumerSpans)
assertThat(consumerSpans)
.filteredOn(s -> s.kind() != CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.isEmpty();
Expand All @@ -113,7 +120,7 @@ public class ITSpringRabbitTracing {
testFixture.produceMessage();
testFixture.awaitMessageConsumed();

Span span1 = testFixture.consumerSpans.get(0), span2 = testFixture.consumerSpans.get(1);
Span span1 = testFixture.consumerSpans.take(), span2 = testFixture.consumerSpans.take();
Span consumerSpan = span1.kind() == Span.Kind.CONSUMER ? span1 : span2;
Span listenerSpan = consumerSpan == span1 ? span2 : span1;

Expand All @@ -126,8 +133,9 @@ public class ITSpringRabbitTracing {
testFixture.awaitMessageConsumed();

List<Span> allSpans = new ArrayList<>();
allSpans.addAll(testFixture.consumerSpans);
allSpans.addAll(testFixture.producerSpans);
allSpans.add(testFixture.producerSpans.take());
allSpans.add(testFixture.consumerSpans.take());
allSpans.add(testFixture.consumerSpans.take());

List<DependencyLink> links = new DependencyLinker().putTrace(allSpans.iterator()).link();
assertThat(links).extracting("parent", "child").containsExactly(
Expand All @@ -140,9 +148,11 @@ public class ITSpringRabbitTracing {
testFixture.produceMessageFromDefault();
testFixture.awaitMessageConsumed();

assertThat(testFixture.consumerSpans).hasSize(2);
List<Span> consumerSpans = new ArrayList<>();
consumerSpans.add(testFixture.producerSpans.take());
consumerSpans.add(testFixture.consumerSpans.take());

assertThat(testFixture.consumerSpans)
assertThat(consumerSpans)
.filteredOn(s -> s.kind() == CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(
Expand All @@ -151,7 +161,7 @@ public class ITSpringRabbitTracing {
entry("rabbit.queue", "test-queue")
);

assertThat(testFixture.consumerSpans)
assertThat(consumerSpans)
.filteredOn(s -> s.kind() != CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.isEmpty();
Expand All @@ -162,11 +172,14 @@ public class ITSpringRabbitTracing {
testFixture.produceMessage();
testFixture.awaitMessageConsumed();

assertThat(testFixture.consumerSpans).hasSize(2);
List<Span> allSpans = new ArrayList<>();
allSpans.add(testFixture.producerSpans.take());
allSpans.add(testFixture.consumerSpans.take());
allSpans.add(testFixture.consumerSpans.take());

assertThat(testFixture.consumerSpans)
assertThat(allSpans)
.extracting(Span::name)
.containsExactly("next-message", "on-message");
.containsExactly("publish", "next-message", "on-message");
}

@Configuration
Expand Down Expand Up @@ -200,11 +213,11 @@ public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
@Configuration
public static class RabbitProducerConfig {
@Bean
public Tracing tracing(Reporter<Span> reporter) {
public Tracing tracing(BlockingQueue<Span> producerSpans) {
return Tracing.newBuilder()
.localServiceName("spring-amqp-producer")
.sampler(Sampler.ALWAYS_SAMPLE)
.spanReporter(reporter)
.spanReporter(producerSpans::add)
.build();
}

Expand All @@ -214,13 +227,8 @@ public SpringRabbitTracing springRabbitTracing(Tracing tracing) {
}

@Bean
public Reporter<Span> reporter(List<Span> spans) {
return spans::add;
}

@Bean
public List<Span> producerSpans() {
return new ArrayList<>();
public BlockingQueue<Span> producerSpans() {
return new LinkedBlockingQueue<>();
}

@Bean
Expand Down Expand Up @@ -262,11 +270,11 @@ public HelloWorldProducer tracingRabbitProducer_decorate(
@Configuration
public static class RabbitConsumerConfig {
@Bean
public Tracing tracing(List<Span> spans) {
public Tracing tracing(BlockingQueue<Span> consumerSpans) {
return Tracing.newBuilder()
.localServiceName("spring-amqp-consumer")
.sampler(Sampler.ALWAYS_SAMPLE)
.spanReporter(spans::add)
.spanReporter(consumerSpans::add)
.build();
}

Expand All @@ -276,20 +284,20 @@ public SpringRabbitTracing springRabbitTracing(Tracing tracing) {
}

@Bean
public List<Span> consumerSpans() {
return new ArrayList<>();
public BlockingQueue<Span> consumerSpans() {
return new LinkedBlockingQueue<>();
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing
) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
rabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
return springRabbitTracing.decorateSimpleRabbitListenerContainerFactory(
simpleRabbitListenerContainerFactory
rabbitListenerContainerFactory
);
}

Expand Down Expand Up @@ -340,17 +348,16 @@ public CountDownLatch getCountDownLatch() {
}

private static class ITSpringAmqpTracingTestFixture {

private ApplicationContext producerContext;
private ApplicationContext consumerContext;
private List<Span> producerSpans;
private List<Span> consumerSpans;
ApplicationContext producerContext;
ApplicationContext consumerContext;
BlockingQueue<Span> producerSpans;
BlockingQueue<Span> consumerSpans;

ITSpringAmqpTracingTestFixture() {
producerContext = producerSpringContext();
consumerContext = consumerSpringContext();
producerSpans = (List<Span>) producerContext.getBean("producerSpans");
consumerSpans = (List<Span>) consumerContext.getBean("consumerSpans");
producerSpans = (BlockingQueue<Span>) producerContext.getBean("producerSpans");
consumerSpans = (BlockingQueue<Span>) consumerContext.getBean("consumerSpans");
}

private void reset() {
Expand Down

0 comments on commit 2eb0d90

Please sign in to comment.