diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java index fae12599dab..e3a66b178d7 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java @@ -62,6 +62,8 @@ public abstract class AbstractMqttClientManager implements ClientManager extends SmartLifecycle, MqttComponent { */ long DEFAULT_COMPLETION_TIMEOUT = 30_000L; + Long QUIESCENT_TIMEOUT = 30_000L; + /** * The default disconnect completion timeout in milliseconds. */ diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java index 7839d039ffa..38e7505f766 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java @@ -151,7 +151,7 @@ public void stop() { return; } try { - client.disconnectForcibly(getDisconnectCompletionTimeout()); + client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(client); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java index cdde26bc578..528e84b151a 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java @@ -26,6 +26,7 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.support.MqttUtils; @@ -153,7 +154,8 @@ public void stop() { } try { - client.disconnectForcibly(getDisconnectCompletionTimeout()); + client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(), + MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(client); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java index d1be00b5588..fe68ad5b16e 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java @@ -50,6 +50,7 @@ * @author Mikhail Polivakha * @author Artem Vozhdayenko * @author Jiri Soucek + * @author Glenn Renfro * * @since 4.0 * @@ -73,6 +74,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends Mess private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT; + private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT; + private boolean manualAcks; private ApplicationEventPublisher applicationEventPublisher; @@ -199,6 +202,20 @@ protected long getDisconnectCompletionTimeout() { return this.disconnectCompletionTimeout; } + /** + * Set the quiescentTimeout timeout when disconnecting. + * Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds. + * @param quiescentTimeout The timeout. + * @since 7.0.0 + */ + public void setQuiescentTimeout(long quiescentTimeout) { + this.quiescentTimeout = quiescentTimeout; + } + + protected long getQuiescentTimeout() { + return this.quiescentTimeout; + } + @Override protected void onInit() { super.onInit(); diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java index 28b5026a952..f56b9a53dc3 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java @@ -58,6 +58,7 @@ * @author Gary Russell * @author Artem Bilan * @author Artem Vozhdayenko + * @author Glenn Renfro * * @since 4.0 * @@ -227,7 +228,7 @@ protected void doStop() { } try { - this.client.disconnectForcibly(getDisconnectCompletionTimeout()); + this.client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.client); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index ec79783f67a..848053c7493 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -37,6 +37,7 @@ import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationEventPublisher; @@ -81,6 +82,7 @@ * @author Lucas Bowler * @author Artem Vozhdayenko * @author Matthias Thoma + * @author Glenn Renfro * * @since 5.5.5 * @@ -296,7 +298,8 @@ protected void doStop() { } if (getClientManager() == null) { - this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout()); + this.mqttClient.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(), + MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties()); if (getConnectionInfo().isAutomaticReconnect()) { MqttUtils.stopClientReconnectCycle(this.mqttClient); } diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java index ba30b442ffb..2cf37627eea 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java @@ -64,6 +64,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Glenn Renfro * * @since 4.0 * @@ -73,6 +74,10 @@ @DirtiesContext public class BackToBackAdapterTests implements MosquittoContainerTest { + private static final long QUIESCENT_TIMEOUT = 1; + + private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L; + @TempDir static File folder; @@ -108,9 +113,7 @@ public void testSingleTopic() { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); adapter.handleMessage(new GenericMessage<>("foo")); @@ -147,9 +150,7 @@ private void testJsonCommon(String... trusted) { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.setConverter(converter); inbound.afterPropertiesSet(); inbound.start(); @@ -178,9 +179,7 @@ public void testAddRemoveTopic() { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); inbound.addTopic("mqtt-foo"); @@ -226,9 +225,7 @@ public void testTwoTopics() { new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo", "mqtt-bar"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); adapter.handleMessage(new GenericMessage<>("foo")); @@ -261,9 +258,7 @@ public void testAsync() throws Exception { MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); GenericMessage message = new GenericMessage<>("foo"); @@ -299,9 +294,7 @@ public void testAsyncPersisted() throws Exception { new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo", "mqtt-bar"); QueueChannel outputChannel = new QueueChannel(); - inbound.setOutputChannel(outputChannel); - inbound.setTaskScheduler(taskScheduler); - inbound.setBeanFactory(mock(BeanFactory.class)); + initializeInboundAdapter(inbound, outputChannel); inbound.afterPropertiesSet(); inbound.start(); Message message1 = new GenericMessage<>("foo"); @@ -396,6 +389,14 @@ public void onApplicationEvent(MqttSubscribedEvent event) { } + private static void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) { + inbound.setOutputChannel(outputChannel); + inbound.setTaskScheduler(taskScheduler); + inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT); + inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + inbound.setBeanFactory(mock(BeanFactory.class)); + } + private class EventPublisher implements ApplicationEventPublisher { private volatile MqttMessageDeliveredEvent delivered; diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java index 24baa784a52..db1c2f60305 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ClientManagerBackToBackTests.java @@ -63,6 +63,10 @@ */ class ClientManagerBackToBackTests implements MosquittoContainerTest { + private static final long QUIESCENT_TIMEOUT = 1L; + + private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L; + @Test void testSameV3ClientIdWorksForPubAndSub() throws Exception { testSubscribeAndPublish(Mqttv3Config.class, Mqttv3Config.TOPIC_NAME, Mqttv3Config.subscribedLatch); @@ -191,7 +195,10 @@ public Mqttv3ClientManager mqttv3ClientManager() { MqttConnectOptions connectionOptions = new MqttConnectOptions(); connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); connectionOptions.setAutomaticReconnect(true); - return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3"); + Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3"); + result.setQuiescentTimeout(QUIESCENT_TIMEOUT); + result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + return result; } @Bean @@ -234,7 +241,10 @@ public Mqttv3ClientManager mqttv3ClientManager() { MqttConnectOptions connectionOptions = new MqttConnectOptions(); connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); connectionOptions.setAutomaticReconnect(true); - return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect"); + Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect"); + result.setQuiescentTimeout(QUIESCENT_TIMEOUT); + result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + return result; } @Bean @@ -269,7 +279,10 @@ public Mqttv3ClientManager mqttv3ClientManager() { MqttConnectOptions connectionOptions = new MqttConnectOptions(); connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); connectionOptions.setAutomaticReconnect(true); - return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3"); + Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3"); + result.setQuiescentTimeout(QUIESCENT_TIMEOUT); + result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); + return result; } @Bean diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java index 0f3caf231b4..3715ca86c38 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java @@ -100,6 +100,7 @@ * @author Gary Russell * @author Artem Bilan * @author Artem Vozhdayenko + * @author Glenn Renfro * * @since 4.0 * @@ -519,7 +520,7 @@ public void testDifferentQos() throws Exception { new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE); adapter.stop(); - verify(client).disconnectForcibly(5_000L); + verify(client).disconnectForcibly(30_000L, 5_000L); } @Test @@ -589,14 +590,14 @@ private void verifyUnsubscribe(IMqttAsyncClient client) throws Exception { verify(client).connect(any(MqttConnectOptions.class)); verify(client).subscribe(any(String[].class), any(int[].class), any()); verify(client).unsubscribe(any(String[].class)); - verify(client).disconnectForcibly(anyLong()); + verify(client).disconnectForcibly(anyLong(), anyLong()); } private void verifyNotUnsubscribe(IMqttAsyncClient client) throws Exception { verify(client).connect(any(MqttConnectOptions.class)); verify(client).subscribe(any(String[].class), any(int[].class), any()); verify(client, never()).unsubscribe(any(String[].class)); - verify(client).disconnectForcibly(anyLong()); + verify(client).disconnectForcibly(anyLong(), anyLong()); } @Configuration diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java index 3cb16af8594..4d634a772fd 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java @@ -57,6 +57,7 @@ * @author Gary Russell * @author Artem Bilan * @author Mikhail Polivakha + * @author Glenn Renfro * * @since 5.5.5 * @@ -65,6 +66,10 @@ @DirtiesContext public class Mqttv5BackToBackTests implements MosquittoContainerTest { + private static final long QUIESCENT_TIMEOUT = 1; + + private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L; + @Autowired @Qualifier("mqttOutFlow.input") private MessageChannel mqttOutFlowInput; @@ -94,7 +99,7 @@ public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() { @Test public void testSimpleMqttv5Interaction() { - String testPayload = "foo"; + String testPayload = "datakey"; this.mqttOutFlowInput.send( MessageBuilder.withPayload(testPayload) @@ -213,6 +218,8 @@ public IntegrationFlow mqttInFlow() { new Mqttv5PahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "mqttv5SIin", mqttSubscription); messageProducer.setPayloadType(String.class); + messageProducer.setQuiescentTimeout(QUIESCENT_TIMEOUT); + messageProducer.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT); messageProducer.setMessageConverter(mqttStringToBytesConverter()); messageProducer.setManualAcks(true); diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 8ab291574ee..4ec95e23c10 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -44,3 +44,9 @@ See xref:redis.adoc#redis-lock-registry[Redis Lock Registry] for more informatio === Hazelcast Changes Previously deprecated classes in the `spring-integation-hazelcast` module, such as `LeaderInitiator`, `HazelcastMembershipListener`, `HazelcastLocalInstanceRegistrar` and `HazelcastLockRegistry`, are now removed due to not supported CP-subsystem in Hazelcast library for Open Source. + +[[x7.0-mqtt-changes]] +=== MQTT Changes + +The `AbstractMqttMessageDrivenChannelAdapter` and `ClientManager` implementations now expose a `quiescentTimeout` option which is propagated in their `stop()` method down to the `disconnectForcibly()` API of the MQTT Paho clients. +See xref:mqtt.adoc[] for more information. \ No newline at end of file