Skip to content

Latest commit

 

History

History
79 lines (64 loc) · 4.31 KB

File metadata and controls

79 lines (64 loc) · 4.31 KB

Thread Barrier

Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs. For example, consider an HTTP request that publishes a message to RabbitMQ. We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.

In version 4.2, Spring Integration introduced the <barrier/> component for this purpose. The underlying MessageHandler is the BarrierMessageHandler. This class also implements MessageTriggerAction, in which a message passed to the trigger() method releases a corresponding thread in the handleRequestMessage() method (if present).

The suspended thread and trigger thread are correlated by invoking a CorrelationStrategy on the messages. When a message is sent to the input-channel, the thread is suspended for up to requestTimeout milliseconds, waiting for a corresponding trigger message. The default correlation strategy uses the IntegrationMessageHeaderAccessor.CORRELATION_ID header. When a trigger message arrives with the same correlation, the thread is released. The message sent to the output-channel after release is constructed by using a MessageGroupProcessor. By default, the message is a Collection<?> of the two payloads, and the headers are merged by using a DefaultAggregatingMessageGroupProcessor.

Caution
If the trigger() method is invoked first (or after the main thread times out), it is suspended for up to triggerTimeout waiting for the suspending message to arrive. If you do not want to suspend the trigger thread, consider handing off to a TaskExecutor instead so that its thread is suspended instead.
Note
Prior version 5.4, there was only one timeout option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions. Therefore requestTimeout and triggerTimeout options have been introduced.

The requires-reply property determines the action to take if the suspended thread times out before the trigger message arrives. By default, it is false, which means the endpoint returns null, the flow ends, and the thread returns to the caller. When true, a ReplyRequiredException is thrown.

You can call the trigger() method programmatically (obtain the bean reference by using the name, barrier.handler — where barrier is the bean name of the barrier endpoint). Alternatively, you can configure an <outbound-channel-adapter/> to trigger the release.

Important
Only one thread can be suspended with the same correlation. The same correlation can be used multiple times but only once concurrently. An exception is thrown if a second thread arrives with the same correlation.

The following example shows how to use a custom header for correlation:

Java
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
XML
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

Depending on which one has a message arrive first, either the thread sending a message to in or the thread sending a message to release waits for up to ten seconds until the other message arrives. When the message is released, the out channel is sent a message that combines the result of invoking the custom MessageGroupProcessor bean, named myOutputProcessor. If the main thread times out and a trigger arrives later, you can configure a discard channel to which the late trigger is sent.

For an example of this component, see the barrier sample application.