Skip to content

Commit

Permalink
Some various polishing
Browse files Browse the repository at this point in the history
* Add `RotatingServerAdvice.StandardRotationPolicy.getCurrent()` method
for better end-user experience when this class is extended
* Add NPE protection into the MQTT Channel Adapters.
Some code style polishing for them

**Cherry-pick to 5.0.x**
  • Loading branch information
artembilan authored and garyrussell committed Oct 10, 2018
1 parent 0f5cfd4 commit 4126411
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*
* @author Gary Russell
* @author Michael Forstner
* @author Artem Bilan
*
* @since 5.0.7
*
Expand Down Expand Up @@ -118,7 +119,7 @@ public static class StandardRotationPolicy implements RotationPolicy {

protected final Log logger = LogFactory.getLog(getClass());

private final DelegatingSessionFactory<?> factory;
protected final DelegatingSessionFactory<?> factory;

private final List<KeyDirectory> keyDirectories = new ArrayList<>();

Expand Down Expand Up @@ -170,6 +171,10 @@ protected boolean isFair() {
return this.fair;
}

protected KeyDirectory getCurrent() {
return this.current;
}

@Override
public void beforeReceive(MessageSource<?> source) {
if (this.fair || !this.initialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv

private final MqttPahoClientFactory clientFactory;

private IMqttClient client;

private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

private volatile IMqttClient client;

private volatile ScheduledFuture<?> reconnectFuture;

private volatile boolean connected;
Expand Down Expand Up @@ -162,6 +162,7 @@ protected synchronized void doStop() {
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
&& this.cleanSession)) {

this.client.unsubscribe(getTopic());
}
}
Expand Down Expand Up @@ -249,8 +250,8 @@ private synchronized void connectAndSubscribe() throws MqttException {
if (grantedQos[i] != requestedQos[i]) {
if (logger.isWarnEnabled()) {
logger.warn("Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics)
+ " requested: " + Arrays.toString(requestedQos)
+ " granted: " + Arrays.toString(grantedQos));
+ " requested: " + Arrays.toString(requestedQos)
+ " granted: " + Arrays.toString(grantedQos));
}
break;
}
Expand Down Expand Up @@ -294,7 +295,8 @@ private synchronized void cancelReconnect() {
}
}

private void scheduleReconnect() {
private synchronized void scheduleReconnect() {
cancelReconnect();
try {
this.reconnectFuture = getTaskScheduler().schedule(() -> {
try {
Expand Down Expand Up @@ -324,12 +326,14 @@ public synchronized void connectionLost(Throwable cause) {
if (isRunning()) {
this.logger.error("Lost connection: " + cause.getMessage() + "; retrying...");
this.connected = false;
try {
this.client.setCallback(null);
this.client.close();
}
catch (MqttException e) {
// NOSONAR
if (this.client != null) {
try {
this.client.setCallback(null);
this.client.close();
}
catch (MqttException e) {
// NOSONAR
}
}
this.client = null;
scheduleReconnect();
Expand All @@ -340,7 +344,7 @@ public synchronized void connectionLost(Throwable cause) {
}

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
public void messageArrived(String topic, MqttMessage mqttMessage) {
Message<?> message = this.getConverter().toMessage(topic, mqttMessage);
try {
sendMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 4.0
*
*/
Expand All @@ -51,13 +52,13 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler

private final MqttPahoClientFactory clientFactory;

private IMqttAsyncClient client;

private boolean async;

private boolean asyncEvents;

private volatile ApplicationEventPublisher applicationEventPublisher;
private ApplicationEventPublisher applicationEventPublisher;

private volatile IMqttAsyncClient client;

/**
* Use this constructor for a single url (although it may be overridden
Expand Down Expand Up @@ -181,7 +182,6 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException {
catch (MqttException e) {
if (client != null) {
client.close();
client = null;
}
throw new MessagingException("Failed to connect", e);
}
Expand Down Expand Up @@ -215,18 +215,20 @@ private void sendDeliveryComplete(IMqttDeliveryToken token) {
@Override
public synchronized void connectionLost(Throwable cause) {
logger.error("Lost connection; will attempt reconnect on next request");
try {
this.client.setCallback(null);
this.client.close();
}
catch (MqttException e) {
// NOSONAR
if (this.client != null) {
try {
this.client.setCallback(null);
this.client.close();
}
catch (MqttException e) {
// NOSONAR
}
this.client = null;
}
this.client = null;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
public void messageArrived(String topic, MqttMessage message) {

}

Expand Down

0 comments on commit 4126411

Please sign in to comment.