Skip to content
This repository has been archived by the owner on Sep 21, 2020. It is now read-only.

Commit

Permalink
[mqtt] Fix several issues after loss of connection (#6485)
Browse files Browse the repository at this point in the history
* fix wrong future handling in handleCommand

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
  • Loading branch information
J-N-K authored and David Gräff committed Dec 2, 2019
1 parent 194551d commit 3ae6a41
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
Expand Up @@ -121,15 +121,21 @@ public void handleCommand(ChannelUID channelUID, Command command) {
}

if (data.isReadOnly()) {
logger.warn("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
return;
}

final CompletableFuture<Boolean> future = data.publishValue(command);
future.exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
return false;
}).thenRun(() -> logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic()));
future.handle((v, ex) -> {
if (ex != null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
ex.getMessage());
} else {
logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
}
return null;
});
}

@Override
Expand Down Expand Up @@ -216,7 +222,7 @@ public void dispose() {
try {
unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.warn("unsubcription on disposal failed for {}: ", thing.getUID(), e);
logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
}
connection = null;
super.dispose();
Expand Down
Expand Up @@ -53,8 +53,7 @@ public class ChannelState implements MqttMessageSubscriber {
protected final Value cachedValue;

// Runtime variables
@Nullable
private MqttBrokerConnection connection;
private @Nullable MqttBrokerConnection connection;
protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
Expand Down Expand Up @@ -288,8 +287,11 @@ private void receivedOrTimeout() {
*/
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
int timeout) {
if (hasSubscribed) {
// if the connection is still the same, the subscription is still present, otherwise we need to renew
if (hasSubscribed && connection.equals(this.connection)) {
return CompletableFuture.completedFuture(null);
} else {
hasSubscribed = false;
}

this.connection = connection;
Expand Down
Expand Up @@ -99,13 +99,14 @@ protected void stop() {
public void dispose() {
// Remove all state descriptions of this handler
channelStateByChannelUID.forEach((uid, state) -> stateDescProvider.remove(uid));
channelStateByChannelUID.clear();
super.dispose();
// there is a design flaw, we can't clean up our stuff because it is needed by the super-class on disposal for unsubscribing
channelStateByChannelUID.clear();
}

@Override
public CompletableFuture<Void> unsubscribeAll() {
return CompletableFuture.allOf(channelStateByChannelUID.values().stream().map(channel -> channel.stop())
return CompletableFuture.allOf(channelStateByChannelUID.values().stream().map(ChannelState::stop)
.toArray(CompletableFuture[]::new));
}

Expand Down

0 comments on commit 3ae6a41

Please sign in to comment.