Skip to content

Commit

Permalink
Upgrade to MQTT Paho 1.2.2
Browse files Browse the repository at this point in the history
- fix mock tests for internal client changes
- reduce stop wait for completion time

**cherry-pick to 5.2.x**

* Remove stack trace from test and convert to assertJ
  • Loading branch information
garyrussell committed Feb 27, 2020
1 parent 87c8e47 commit 2d3d37e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ext {
mockitoVersion = '3.2.4'
mongoDriverVersion = '4.0.0-beta1'
mysqlVersion = '8.0.19'
pahoMqttClientVersion = '1.2.0'
pahoMqttClientVersion = '1.2.2'
postgresVersion = '42.2.10'
reactorVersion = 'Dysprosium-SR4'
resilience4jVersion = '1.3.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,15 +51,19 @@
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback, ApplicationEventPublisherAware {

public static final long DEFAULT_COMPLETION_TIMEOUT = 30000L;
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
public static final long STOP_COMPLETION_TIMEOUT = 5_000L;

private static final int DEFAULT_RECOVERY_INTERVAL = 10_000;

private final MqttPahoClientFactory clientFactory;

private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

private volatile long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

private long stopCompletionTimeout = STOP_COMPLETION_TIMEOUT;

private volatile IMqttClient client;

Expand Down Expand Up @@ -122,6 +126,16 @@ public void setCompletionTimeout(long completionTimeout) {
this.completionTimeout = completionTimeout;
}

/**
* Set the completion timeout wnen stopping. Not settable using the namespace.
* Default {@value #STOP_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 5.2.5
*/
public void setStopCompletionTimeout(long completionTimeout) {
this.stopCompletionTimeout = completionTimeout;
}

/**
* The time (ms) to wait between reconnection attempts.
* Default {@value #DEFAULT_RECOVERY_INTERVAL}.
Expand Down Expand Up @@ -170,7 +184,7 @@ protected synchronized void doStop() {
logger.error("Exception while unsubscribing", e);
}
try {
this.client.disconnectForcibly(this.completionTimeout);
this.client.disconnectForcibly(this.stopCompletionTimeout);
}
catch (MqttException e) {
logger.error("Exception while disconnecting", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,12 @@
package org.springframework.integration.mqtt;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
Expand Down Expand Up @@ -48,8 +50,10 @@

import org.aopalliance.intercept.MethodInterceptor;
import org.apache.commons.logging.Log;
import org.assertj.core.api.Condition;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
Expand Down Expand Up @@ -435,11 +439,13 @@ public void testSubscribeFailure() throws Exception {
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
(IMqttMessageListener[]) isNull());
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());

IMqttToken token = mock(IMqttToken.class);
given(token.getGrantedQos()).willReturn(new int[] { 0x80 });
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), any(), any());
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());

MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
"baz", "fix");
Expand All @@ -449,15 +455,12 @@ public void testSubscribeFailure() throws Exception {
method.set(m);
}, m -> m.getName().equals("connectAndSubscribe"));
assertThat(method.get()).isNotNull();
try {
method.get().invoke(adapter);
fail("Expected InvocationTargetException");
}
catch (InvocationTargetException e) {
assertThat(e.getCause()).isInstanceOf(MqttException.class);
assertThat(((MqttException) e.getCause()).getReasonCode())
.isEqualTo((int) MqttException.REASON_CODE_SUBSCRIBE_FAILED);
}
Condition<InvocationTargetException> subscribeFailed = new Condition<>(ex ->
((MqttException) ex.getCause()).getReasonCode() == MqttException.REASON_CODE_SUBSCRIBE_FAILED,
"expected the reason code to be REASON_CODE_SUBSCRIBE_FAILED");
assertThatExceptionOfType(InvocationTargetException.class).isThrownBy(() -> method.get().invoke(adapter))
.withCauseInstanceOf(MqttException.class)
.is(subscribeFailed);
}

@Test
Expand Down Expand Up @@ -485,11 +488,13 @@ public void testDifferentQos() throws Exception {
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
(IMqttMessageListener[]) isNull());
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());

IMqttToken token = mock(IMqttToken.class);
given(token.getGrantedQos()).willReturn(new int[] { 2, 0 });
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), any(), any());
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());

MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
"baz", "fix");
Expand All @@ -506,6 +511,10 @@ public void testDifferentQos() throws Exception {
verify(logger, atLeastOnce())
.warn("Granted QOS different to Requested QOS; topics: [baz, fix] requested: [1, 1] granted: [2, 0]");
verify(client).setTimeToWait(30_000L);

new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE);
adapter.stop();
verify(client).disconnectForcibly(5_000L);
}

private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession,
Expand Down

0 comments on commit 2d3d37e

Please sign in to comment.