Skip to content

Commit

Permalink
Fix race condition in the IntObservZipkinTests
Browse files Browse the repository at this point in the history
The `QueueChannel.receive()` may be fulfilled before
an observation is stopped in the `MessageHandler`

* Rework `IntegrationObservabilityZipkinTests` configuration to add
a `HandleMessageAdvice` to wait on the `CountDownLatch` before asserting spans
* Exclude an `adviceChain` attribute from the `ServiceActivatorAnnotationPostProcessor`
since an advice can be applied for the consumer endpoint, not the MH directly.
The `ConsumerEndpointFactoryBean` does the proper decision to apply advice onto MH
or just around its `handleMessage()` method
  • Loading branch information
artembilan committed Sep 20, 2022
1 parent 91a8b14 commit 145c845
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class ServiceActivatorAnnotationPostProcessor extends AbstractMethodAnnotationPostProcessor<ServiceActivator> {

public ServiceActivatorAnnotationPostProcessor() {
this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply", "adviceChain"));
this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.EndpointId;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.integration.support.MutableMessage;
import org.springframework.integration.support.MutableMessageBuilder;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -79,6 +85,9 @@ public SampleTestRunnerConsumer yourCode() {
Message<?> receive = replyChannel.receive(10_000);
assertThat(receive).isNotNull()
.extracting("payload").isEqualTo("test data");
var configuration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class);

assertThat(configuration.observedHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

SpansAssert.assertThat(bb.getFinishedSpans())
Expand All @@ -105,19 +114,40 @@ public SampleTestRunnerConsumer yourCode() {
@EnableIntegrationManagement
public static class ObservationIntegrationTestConfiguration {

CountDownLatch observedHandlerLatch = new CountDownLatch(1);

@Bean
@GlobalChannelInterceptor
public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry observationRegistry) {
return new ObservationPropagationChannelInterceptor(observationRegistry);
}

@Bean
@BridgeTo(poller = @Poller(fixedDelay = "100"))
@EndpointId("observedEndpoint")
public PollableChannel queueChannel() {
return new QueueChannel();
}

@Bean
@EndpointId("observedEndpoint")
@ServiceActivator(inputChannel = "queueChannel",
poller = @Poller(fixedDelay = "100"),
adviceChain = "observedHandlerAdvice")
BridgeHandler bridgeHandler() {
return new BridgeHandler();
}

@Bean
HandleMessageAdvice observedHandlerAdvice() {
return invocation -> {
try {
return invocation.proceed();
}
finally {
this.observedHandlerLatch.countDown();
}
};
}

}

}

0 comments on commit 145c845

Please sign in to comment.