Skip to content

Commit

Permalink
GH-3367: Separate timeouts in BarrierMH
Browse files Browse the repository at this point in the history
Fixes #3367

Introduce a `requestTimeout` and `triggerTimeout` for `BarrierMessageHandler`
For instance, if an HTTP request sends a message to the barrier,
it should time out after 1min if no trigger message is received.
If the trigger message then arrives late and the HTTP request is no longer waiting,
it shouldn't wait for 1min before discarding the request but do so immediately.
  • Loading branch information
micheljung authored and artembilan committed Aug 21, 2020
1 parent 3fb6567 commit 6780bbd
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,14 +39,15 @@
* the timeout occurs. Only one thread with a particular correlation (result of invoking
* the {@link CorrelationStrategy}) can be suspended at a time. If the inbound thread does
* not arrive before the trigger thread, the latter is suspended until it does, or the
* timeout occurs.
* timeout occurs. Separate timeouts may be configured for request and trigger messages.
* <p>
* The default {@link CorrelationStrategy} is a {@link HeaderAttributeCorrelationStrategy}.
* <p>
* The default output processor is a {@link DefaultAggregatingMessageGroupProcessor}.
*
* @author Gary Russell
* @author Artem Bilan
* @author Michel Jung
*
* @since 4.2
*/
Expand All @@ -57,7 +58,9 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler

private final Map<Object, Thread> inProcess = new ConcurrentHashMap<>();

private final long timeout;
private final long requestTimeout;

private final long triggerTimeout;

private final CorrelationStrategy correlationStrategy;

Expand All @@ -70,48 +73,100 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler
/**
* Construct an instance with the provided timeout and default correlation and
* output strategies.
* @param timeout the timeout in milliseconds.
* @param timeout the timeout in milliseconds for both, request and trigger messages.
*/
public BarrierMessageHandler(long timeout) {
this(timeout, new DefaultAggregatingMessageGroupProcessor());
this(timeout, timeout);
}

/**
* Construct an instance with the provided timeout and output processor, and default
* correlation strategy.
* @param timeout the timeout in milliseconds.
* @param timeout the timeout in milliseconds for both, request and trigger messages.
* @param outputProcessor the output {@link MessageGroupProcessor}.
*/
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor) {
this(timeout, outputProcessor, null);
this(timeout, timeout, outputProcessor);
}

/**
* Construct an instance with the provided timeout and correlation strategy, and default
* output processor.
* @param timeout the timeout in milliseconds.
* @param timeout the timeout in milliseconds for both, request and trigger messages.
* @param correlationStrategy the correlation strategy.
*/
public BarrierMessageHandler(long timeout, CorrelationStrategy correlationStrategy) {
this(timeout, new DefaultAggregatingMessageGroupProcessor(), correlationStrategy);
this(timeout, timeout, correlationStrategy);
}

/**
* Construct an instance with the provided timeout and output processor, and default
* correlation strategy.
* @param timeout the timeout in milliseconds.
* @param timeout the timeout in milliseconds for both, request and trigger messages.
* @param outputProcessor the output {@link MessageGroupProcessor}.
* @param correlationStrategy the correlation strategy.
*/
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor,
CorrelationStrategy correlationStrategy) {

this(timeout, timeout, outputProcessor, correlationStrategy);
}

/**
* Construct an instance with the provided timeouts and default correlation and
* output strategies.
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
* @since 5.4
*/
public BarrierMessageHandler(long requestTimeout, long triggerTimeout) {
this(requestTimeout, triggerTimeout, new DefaultAggregatingMessageGroupProcessor());
}

/**
* Construct an instance with the provided timeout and output processor, and default
* correlation strategy.
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
* @param outputProcessor the output {@link MessageGroupProcessor}.
* @since 5.4
*/
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor) {
this(requestTimeout, triggerTimeout, outputProcessor, null);
}

/**
* Construct an instance with the provided timeout and correlation strategy, and default
* output processor.
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
* @param correlationStrategy the correlation strategy.
* @since 5.4
*/
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, CorrelationStrategy correlationStrategy) {
this(requestTimeout, triggerTimeout, new DefaultAggregatingMessageGroupProcessor(), correlationStrategy);
}

/**
* Construct an instance with the provided timeout and output processor, and default
* correlation strategy.
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
* @param outputProcessor the output {@link MessageGroupProcessor}.
* @param correlationStrategy the correlation strategy.
* @since 5.4
*/
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor,
CorrelationStrategy correlationStrategy) {

Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null");
this.messageGroupProcessor = outputProcessor;
this.correlationStrategy = (correlationStrategy == null
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
: correlationStrategy);
this.timeout = timeout;
this.correlationStrategy =
correlationStrategy == null
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
: correlationStrategy;
this.requestTimeout = requestTimeout;
this.triggerTimeout = triggerTimeout;
}

/**
Expand Down Expand Up @@ -163,12 +218,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
}
Thread existing = this.inProcess.putIfAbsent(key, Thread.currentThread());
if (existing != null) {
throw new MessagingException(requestMessage, "Correlation key ("
+ key + ") is already in use by " + existing.getName());
throw new MessagingException(requestMessage,
"Correlation key (" + key + ") is already in use by " + existing.getName());
}
SynchronousQueue<Message<?>> syncQueue = createOrObtainQueue(key);
try {
Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
Message<?> releaseMessage = syncQueue.poll(this.requestTimeout, TimeUnit.MILLISECONDS);
if (releaseMessage != null) {
return processRelease(key, requestMessage, releaseMessage);
}
Expand Down Expand Up @@ -228,7 +283,7 @@ public void trigger(Message<?> message) {
}
SynchronousQueue<Message<?>> syncQueue = createOrObtainQueue(key);
try {
if (!syncQueue.offer(message, this.timeout, TimeUnit.MILLISECONDS)) {
if (!syncQueue.offer(message, this.triggerTimeout, TimeUnit.MILLISECONDS)) {
this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
this.suspensions.remove(key);
MessageChannel messageChannel = getDiscardChannel();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,16 +27,20 @@
* Parser for {@code <int:barrier/>}.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.2
*/
public class BarrierParser extends AbstractConsumerEndpointParser {

@Override
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder
.genericBeanDefinition(BarrierMessageHandler.class);
BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(BarrierMessageHandler.class);
handlerBuilder.addConstructorArgValue(element.getAttribute("timeout"));
String triggerTimeout = element.getAttribute("trigger-timeout");
if (StringUtils.hasText(triggerTimeout)) {
handlerBuilder.addConstructorArgValue(triggerTimeout);
}
String processor = element.getAttribute("output-processor");
if (StringUtils.hasText(processor)) {
handlerBuilder.addConstructorArgReference(processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1736,8 +1736,7 @@
<xsd:complexType>
<xsd:choice minOccurs="0" maxOccurs="2">
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1"/>
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0"
maxOccurs="1"/>
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0"/>
<xsd:element ref="poller"/>
</xsd:choice>
<xsd:attributeGroup ref="inputOutputChannelGroup"/>
Expand All @@ -1751,7 +1750,15 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="requires-reply" type="xsd:string" use="optional">
<xsd:attribute name="trigger-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
The time in milliseconds to suspend the trigger thread.
If not provided a 'timeout' is used.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="requires-reply" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Specify whether the barrier must return a non-null value. This value will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

<int:barrier id="barrier1" input-channel="in" output-channel="out" correlation-strategy-expression="'foo'"
requires-reply="true" discard-channel="discards"
timeout="10000">
timeout="10000"
trigger-timeout="5000">
<int:poller fixed-delay="100" />
</int:barrier>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.aggregator.BarrierMessageHandler;
Expand All @@ -35,16 +34,16 @@
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.2
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class BarrierParserTests {

Expand All @@ -71,8 +70,8 @@ public class BarrierParserTests {

@Test
public void parserTestsWithMessage() {
this.in.send(new GenericMessage<String>("foo"));
this.release.send(new GenericMessage<String>("bar"));
this.in.send(new GenericMessage<>("foo"));
this.release.send(new GenericMessage<>("bar"));
Message<?> received = out.receive(10000);
assertThat(received).isNotNull();
this.barrier1.stop();
Expand All @@ -82,7 +81,8 @@ public void parserTestsWithMessage() {
public void parserFieldPopulationTests() {
BarrierMessageHandler handler = TestUtils.getPropertyValue(this.barrier1, "handler",
BarrierMessageHandler.class);
assertThat(TestUtils.getPropertyValue(handler, "timeout")).isEqualTo(10000L);
assertThat(TestUtils.getPropertyValue(handler, "requestTimeout")).isEqualTo(10000L);
assertThat(TestUtils.getPropertyValue(handler, "triggerTimeout")).isEqualTo(5000L);
assertThat(TestUtils.getPropertyValue(handler, "requiresReply", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(this.barrier2, "handler.correlationStrategy"))
.isInstanceOf(HeaderAttributeCorrelationStrategy.class);
Expand Down
21 changes: 9 additions & 12 deletions src/reference/asciidoc/barrier.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@

Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs.
For example, consider an HTTP request that publishes a message to RabbitMQ.
We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was
received.
We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.

In version 4.2, Spring Integration introduced the `<barrier/>` component for this purpose.
The underlying `MessageHandler` is the `BarrierMessageHandler`.
This class also implements
`MessageTriggerAction`, in which a message passed to the `trigger()` method releases a corresponding thread in the
`handleRequestMessage()` method (if present).
This class also implements `MessageTriggerAction`, in which a message passed to the `trigger()` method releases a corresponding thread in the `handleRequestMessage()` method (if present).

The suspended thread and trigger thread are correlated by invoking a `CorrelationStrategy` on the messages.
When a message is sent to the `input-channel`, the thread is suspended for up to `timeout` milliseconds, waiting for
a corresponding trigger message.
When a message is sent to the `input-channel`, the thread is suspended for up to `requestTimeout` milliseconds, waiting for a corresponding trigger message.
The default correlation strategy uses the `IntegrationMessageHeaderAccessor.CORRELATION_ID` header.
When a trigger message arrives with the same correlation, the thread is released.
The message sent to the `output-channel` after release is constructed by using a `MessageGroupProcessor`.
By default, the message is a `Collection<?>` of the two payloads, and the headers are merged by using a
`DefaultAggregatingMessageGroupProcessor`.
By default, the message is a `Collection<?>` of the two payloads, and the headers are merged by using a `DefaultAggregatingMessageGroupProcessor`.

CAUTION: If the `trigger()` method is invoked first (or after the main thread times out), it is suspended for up to `timeout` waiting for the suspending message to arrive.
CAUTION: If the `trigger()` method is invoked first (or after the main thread times out), it is suspended for up to `triggerTimeout` waiting for the suspending message to arrive.
If you do not want to suspend the trigger thread, consider handing off to a `TaskExecutor` instead so that its thread is suspended instead.

NOTE: Prior version 5.4, there was only one `timeout` option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions.
Therefore `requestTimeout` and `triggerTimeout` options have been introduced.

The `requires-reply` property determines the action to take if the suspended thread times out before the trigger message arrives.
By default, it is `false`, which means the endpoint returns `null`, the flow ends, and the thread returns to the
caller.
By default, it is `false`, which means the endpoint returns `null`, the flow ends, and the thread returns to the caller.
When `true`, a `ReplyRequiredException` is thrown.

You can call the `trigger()` method programmatically (obtain the bean reference by using the name, `barrier.handler` -- where `barrier` is the bean name of the barrier endpoint).
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ See <<./aggregator.adoc#aggregator-expiring-groups, Aggregator Expiring Groups>>

The legacy metrics that were replaced by Micrometer meters have been removed.

The <<./barrier.adoc#barrier,Thread Barrier>> has now two separate timeout options: `requestTimeout` and `triggerTimeout`.

[[x5.4-tcp]]
=== TCP Changes

Expand Down

0 comments on commit 6780bbd

Please sign in to comment.